1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14 Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15 Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16 TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20 "CreateCluster",
21 "DescribeClusters",
22 "DeleteCluster",
23 "ListClusters",
24 "UpdateCluster",
25 "UpdateClusterSettings",
26 "PutClusterCapacityProviders",
27 "RegisterTaskDefinition",
28 "DescribeTaskDefinition",
29 "DeregisterTaskDefinition",
30 "DeleteTaskDefinitions",
31 "ListTaskDefinitions",
32 "ListTaskDefinitionFamilies",
33 "TagResource",
34 "UntagResource",
35 "ListTagsForResource",
36 "PutAccountSetting",
37 "PutAccountSettingDefault",
38 "DeleteAccountSetting",
39 "ListAccountSettings",
40 "RunTask",
41 "StartTask",
42 "StopTask",
43 "DescribeTasks",
44 "ListTasks",
45 "CreateService",
46 "UpdateService",
47 "DeleteService",
48 "DescribeServices",
49 "ListServices",
50 "ListServicesByNamespace",
51 "RegisterContainerInstance",
52 "DeregisterContainerInstance",
53 "DescribeContainerInstances",
54 "ListContainerInstances",
55 "UpdateContainerAgent",
56 "UpdateContainerInstancesState",
57 "PutAttributes",
58 "DeleteAttributes",
59 "ListAttributes",
60 "CreateCapacityProvider",
61 "DeleteCapacityProvider",
62 "DescribeCapacityProviders",
63 "UpdateCapacityProvider",
64 "GetTaskProtection",
65 "UpdateTaskProtection",
66 "CreateTaskSet",
67 "UpdateTaskSet",
68 "DeleteTaskSet",
69 "DescribeTaskSets",
70 "UpdateServicePrimaryTaskSet",
71 "ExecuteCommand",
72 "SubmitContainerStateChange",
73 "SubmitTaskStateChange",
74 "SubmitAttachmentStateChanges",
75 "DiscoverPollEndpoint",
76 "StopServiceDeployment",
77 "ListServiceDeployments",
78 "DescribeServiceDeployments",
79 "DescribeServiceRevisions",
80];
81
82fn is_mutating(action: &str) -> bool {
83 matches!(
84 action,
85 "CreateCluster"
86 | "DeleteCluster"
87 | "UpdateCluster"
88 | "UpdateClusterSettings"
89 | "PutClusterCapacityProviders"
90 | "RegisterTaskDefinition"
91 | "DeregisterTaskDefinition"
92 | "DeleteTaskDefinitions"
93 | "TagResource"
94 | "UntagResource"
95 | "PutAccountSetting"
96 | "PutAccountSettingDefault"
97 | "DeleteAccountSetting"
98 | "RunTask"
99 | "StartTask"
100 | "StopTask"
101 | "CreateService"
102 | "UpdateService"
103 | "DeleteService"
104 | "RegisterContainerInstance"
105 | "DeregisterContainerInstance"
106 | "UpdateContainerAgent"
107 | "UpdateContainerInstancesState"
108 | "PutAttributes"
109 | "DeleteAttributes"
110 | "CreateCapacityProvider"
111 | "DeleteCapacityProvider"
112 | "UpdateCapacityProvider"
113 | "UpdateTaskProtection"
114 | "CreateTaskSet"
115 | "UpdateTaskSet"
116 | "DeleteTaskSet"
117 | "UpdateServicePrimaryTaskSet"
118 | "SubmitContainerStateChange"
119 | "SubmitTaskStateChange"
120 | "SubmitAttachmentStateChanges"
121 | "StopServiceDeployment"
122 )
123}
124
125pub struct EcsService {
126 state: SharedEcsState,
127 snapshot_store: Option<Arc<dyn SnapshotStore>>,
128 snapshot_lock: Arc<AsyncMutex<()>>,
129 runtime: Option<Arc<crate::runtime::EcsRuntime>>,
130 role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
131}
132
133impl EcsService {
134 pub fn new(state: SharedEcsState) -> Self {
135 Self {
136 state,
137 snapshot_store: None,
138 snapshot_lock: Arc::new(AsyncMutex::new(())),
139 runtime: None,
140 role_trust_validator: None,
141 }
142 }
143
144 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
145 self.snapshot_store = Some(store);
146 self
147 }
148
149 pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
150 self.runtime = Some(runtime);
151 self
152 }
153
154 pub fn with_role_trust_validator(
155 mut self,
156 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
157 ) -> Self {
158 self.role_trust_validator = Some(validator);
159 self
160 }
161
162 fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
163 let Some(ref validator) = self.role_trust_validator else {
164 return Ok(());
165 };
166 if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
167 return Err(AwsServiceError::aws_error(
168 StatusCode::BAD_REQUEST,
169 "InvalidParameterException",
170 err.to_string(),
171 ));
172 }
173 Ok(())
174 }
175
176 pub fn state_handle(&self) -> &SharedEcsState {
177 &self.state
178 }
179
180 async fn save_snapshot(&self) {
181 let Some(store) = self.snapshot_store.clone() else {
182 return;
183 };
184 let _guard = self.snapshot_lock.lock().await;
185 let snapshot = EcsSnapshot {
186 schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
187 accounts: Some(self.state.read().clone()),
188 };
189 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
190 let bytes = serde_json::to_vec(&snapshot)
191 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
192 store.save(&bytes)
193 })
194 .await;
195 match join {
196 Ok(Ok(())) => {}
197 Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
198 Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
199 }
200 }
201}
202
203#[async_trait]
204impl AwsService for EcsService {
205 fn service_name(&self) -> &str {
206 "ecs"
207 }
208
209 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
210 let mutates = is_mutating(request.action.as_str());
211 let result = match request.action.as_str() {
212 "CreateCluster" => self.create_cluster(&request),
213 "DescribeClusters" => self.describe_clusters(&request),
214 "DeleteCluster" => self.delete_cluster(&request),
215 "ListClusters" => self.list_clusters(&request),
216 "UpdateCluster" => self.update_cluster(&request),
217 "UpdateClusterSettings" => self.update_cluster_settings(&request),
218 "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
219 "RegisterTaskDefinition" => self.register_task_definition(&request),
220 "DescribeTaskDefinition" => self.describe_task_definition(&request),
221 "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
222 "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
223 "ListTaskDefinitions" => self.list_task_definitions(&request),
224 "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
225 "TagResource" => self.tag_resource(&request),
226 "UntagResource" => self.untag_resource(&request),
227 "ListTagsForResource" => self.list_tags_for_resource(&request),
228 "PutAccountSetting" => self.put_account_setting(&request),
229 "PutAccountSettingDefault" => self.put_account_setting_default(&request),
230 "DeleteAccountSetting" => self.delete_account_setting(&request),
231 "ListAccountSettings" => self.list_account_settings(&request),
232 "RunTask" => self.run_task(&request),
233 "StartTask" => self.start_task(&request),
234 "StopTask" => self.stop_task(&request).await,
235 "DescribeTasks" => self.describe_tasks(&request),
236 "ListTasks" => self.list_tasks(&request),
237 "CreateService" => self.create_service(&request),
238 "UpdateService" => self.update_service(&request),
239 "DeleteService" => self.delete_service(&request).await,
240 "DescribeServices" => self.describe_services(&request),
241 "ListServices" => self.list_services(&request),
242 "ListServicesByNamespace" => self.list_services_by_namespace(&request),
243 "RegisterContainerInstance" => self.register_container_instance(&request),
244 "DeregisterContainerInstance" => self.deregister_container_instance(&request),
245 "DescribeContainerInstances" => self.describe_container_instances(&request),
246 "ListContainerInstances" => self.list_container_instances(&request),
247 "UpdateContainerAgent" => self.update_container_agent(&request),
248 "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
249 "PutAttributes" => self.put_attributes(&request),
250 "DeleteAttributes" => self.delete_attributes(&request),
251 "ListAttributes" => self.list_attributes(&request),
252 "CreateCapacityProvider" => self.create_capacity_provider(&request),
253 "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
254 "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
255 "UpdateCapacityProvider" => self.update_capacity_provider(&request),
256 "GetTaskProtection" => self.get_task_protection(&request),
257 "UpdateTaskProtection" => self.update_task_protection(&request),
258 "CreateTaskSet" => self.create_task_set(&request),
259 "UpdateTaskSet" => self.update_task_set(&request),
260 "DeleteTaskSet" => self.delete_task_set(&request),
261 "DescribeTaskSets" => self.describe_task_sets(&request),
262 "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
263 "ExecuteCommand" => self.execute_command(&request).await,
264 "SubmitContainerStateChange" => self.submit_container_state_change(&request),
265 "SubmitTaskStateChange" => self.submit_task_state_change(&request),
266 "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
267 "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
268 "StopServiceDeployment" => self.stop_service_deployment(&request),
269 "ListServiceDeployments" => self.list_service_deployments(&request),
270 "DescribeServiceDeployments" => self.describe_service_deployments(&request),
271 "DescribeServiceRevisions" => self.describe_service_revisions(&request),
272 _ => Err(AwsServiceError::action_not_implemented(
273 "ecs",
274 &request.action,
275 )),
276 };
277 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
278 self.save_snapshot().await;
279 }
280 result
281 }
282
283 fn supported_actions(&self) -> &[&str] {
284 SUPPORTED_ACTIONS
285 }
286}
287
288fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
291 body.get(field)
292 .and_then(|v| v.as_str())
293 .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
294}
295
296fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
297 body.get(field).and_then(|v| v.as_str())
298}
299
300fn client_exception(message: impl Into<String>) -> AwsServiceError {
301 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
302}
303
304fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
305 AwsServiceError::aws_error(
306 StatusCode::BAD_REQUEST,
307 "InvalidParameterException",
308 message,
309 )
310}
311
312fn cluster_not_found(name: &str) -> AwsServiceError {
313 AwsServiceError::aws_error(
314 StatusCode::BAD_REQUEST,
315 "ClusterNotFoundException",
316 format!("The referenced cluster was inactive: {name}"),
317 )
318}
319
320fn cluster_contains_services() -> AwsServiceError {
321 AwsServiceError::aws_error(
322 StatusCode::BAD_REQUEST,
323 "ClusterContainsServicesException",
324 "The specified cluster still contains active services",
325 )
326}
327
328fn cluster_contains_tasks() -> AwsServiceError {
329 AwsServiceError::aws_error(
330 StatusCode::BAD_REQUEST,
331 "ClusterContainsTasksException",
332 "The specified cluster still contains active tasks",
333 )
334}
335
336fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
337 AwsServiceError::aws_error(
338 StatusCode::BAD_REQUEST,
339 "ClientException",
340 format!("Unable to describe task definition: {family_rev}"),
341 )
342}
343
344fn parse_tags(body: &Value) -> Vec<TagEntry> {
345 body.get("tags")
346 .and_then(|v| v.as_array())
347 .map(|arr| {
348 arr.iter()
349 .filter_map(|t| {
350 let k = t.get("key").and_then(|v| v.as_str())?;
351 let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
352 Some(TagEntry {
353 key: k.to_string(),
354 value: v.to_string(),
355 })
356 })
357 .collect()
358 })
359 .unwrap_or_default()
360}
361
362fn tags_json(tags: &[TagEntry]) -> Value {
363 Value::Array(
364 tags.iter()
365 .map(|t| json!({"key": t.key, "value": t.value}))
366 .collect(),
367 )
368}
369
370fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
371 for new_tag in incoming {
372 if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
373 existing.value = new_tag.value;
374 } else {
375 current.push(new_tag);
376 }
377 }
378}
379
380fn cluster_to_json(cluster: &Cluster) -> Value {
381 json!({
382 "clusterArn": cluster.cluster_arn,
383 "clusterName": cluster.cluster_name,
384 "status": cluster.status,
385 "registeredContainerInstancesCount": cluster.registered_container_instances_count,
386 "runningTasksCount": cluster.running_tasks_count,
387 "pendingTasksCount": cluster.pending_tasks_count,
388 "activeServicesCount": cluster.active_services_count,
389 "statistics": cluster.statistics,
390 "tags": tags_json(&cluster.tags),
391 "settings": cluster.settings,
392 "configuration": cluster.configuration,
393 "capacityProviders": cluster.capacity_providers,
394 "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
395 "attachments": cluster.attachments,
396 "attachmentsStatus": cluster.attachments_status,
397 "serviceConnectDefaults": cluster.service_connect_defaults,
398 })
399}
400
401fn task_definition_to_json(td: &TaskDefinition) -> Value {
402 let mut map = Map::new();
403 map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
404 map.insert("family".into(), json!(td.family));
405 map.insert("revision".into(), json!(td.revision));
406 map.insert("status".into(), json!(td.status));
407 map.insert(
408 "containerDefinitions".into(),
409 Value::Array(td.container_definitions.clone()),
410 );
411 map.insert("compatibilities".into(), json!(td.compatibilities));
412 map.insert(
413 "requiresCompatibilities".into(),
414 json!(td.requires_compatibilities),
415 );
416 map.insert("volumes".into(), Value::Array(td.volumes.clone()));
417 map.insert(
418 "placementConstraints".into(),
419 Value::Array(td.placement_constraints.clone()),
420 );
421 map.insert(
422 "requiresAttributes".into(),
423 Value::Array(td.requires_attributes.clone()),
424 );
425 map.insert(
426 "inferenceAccelerators".into(),
427 Value::Array(td.inference_accelerators.clone()),
428 );
429 if let Some(ref x) = td.network_mode {
430 map.insert("networkMode".into(), json!(x));
431 }
432 if let Some(ref x) = td.cpu {
433 map.insert("cpu".into(), json!(x));
434 }
435 if let Some(ref x) = td.memory {
436 map.insert("memory".into(), json!(x));
437 }
438 if let Some(ref x) = td.task_role_arn {
439 map.insert("taskRoleArn".into(), json!(x));
440 }
441 if let Some(ref x) = td.execution_role_arn {
442 map.insert("executionRoleArn".into(), json!(x));
443 }
444 if let Some(ref x) = td.pid_mode {
445 map.insert("pidMode".into(), json!(x));
446 }
447 if let Some(ref x) = td.ipc_mode {
448 map.insert("ipcMode".into(), json!(x));
449 }
450 if let Some(ref x) = td.proxy_configuration {
451 map.insert("proxyConfiguration".into(), x.clone());
452 }
453 if let Some(ref x) = td.ephemeral_storage {
454 map.insert("ephemeralStorage".into(), x.clone());
455 }
456 if let Some(ref x) = td.runtime_platform {
457 map.insert("runtimePlatform".into(), x.clone());
458 }
459 if let Some(ref x) = td.registered_by {
460 map.insert("registeredBy".into(), json!(x));
461 }
462 map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
463 if let Some(ts) = td.deregistered_at {
464 map.insert("deregisteredAt".into(), json!(ts.timestamp()));
465 }
466 if let Some(enabled) = td.enable_fault_injection {
467 map.insert("enableFaultInjection".into(), json!(enabled));
468 }
469 Value::Object(map)
470}
471
472fn resolve_service_key(state: &EcsState, tail: &str) -> Option<String> {
478 if tail.contains('/') {
479 return state.services.contains_key(tail).then(|| tail.to_string());
480 }
481 let suffix = format!("/{tail}");
482 state
483 .services
484 .keys()
485 .find(|k| k.ends_with(&suffix))
486 .cloned()
487}
488
489fn resolve_container_instance_key(state: &EcsState, tail: &str) -> Option<String> {
492 if tail.contains('/') {
493 return state
494 .container_instances
495 .contains_key(tail)
496 .then(|| tail.to_string());
497 }
498 let suffix = format!("/{tail}");
499 state
500 .container_instances
501 .keys()
502 .find(|k| k.ends_with(&suffix))
503 .cloned()
504}
505
506fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
510 let rest = arn
511 .strip_prefix("arn:aws:ecs:")
512 .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
513 let mut parts = rest.splitn(3, ':');
516 let _region = parts
517 .next()
518 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
519 let account = parts
520 .next()
521 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
522 let resource = parts
523 .next()
524 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
525 let (resource_type, tail) = resource
526 .split_once('/')
527 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
528 Ok((
529 account.to_string(),
530 resource_type.to_string(),
531 tail.to_string(),
532 ))
533}
534
535fn parse_family_revision(input: &str) -> (String, Option<i32>) {
539 if let Some((family, rev)) = input.rsplit_once(':') {
540 if let Ok(n) = rev.parse::<i32>() {
541 return (family.to_string(), Some(n));
542 }
543 }
544 (input.to_string(), None)
545}
546
547fn resolve_task_definition_ref(
551 input: &str,
552) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
553 if input.starts_with("arn:aws:ecs:") {
554 let (account, resource_type, tail) = decode_ecs_arn(input)?;
555 if resource_type != "task-definition" {
556 return Err(invalid_parameter(format!(
557 "Expected task-definition ARN: {input}"
558 )));
559 }
560 let (family, rev) = parse_family_revision(&tail);
561 Ok((Some(account), family, rev))
562 } else {
563 let (family, rev) = parse_family_revision(input);
564 Ok((None, family, rev))
565 }
566}
567
568fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
569 if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
570 account
571 } else {
572 request.account_id.clone()
573 }
574}
575
576fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
577 if let Some(input) = cluster_ref {
578 if input.starts_with("arn:aws:ecs:") {
579 if let Ok((account, _, _)) = decode_ecs_arn(input) {
580 return account;
581 }
582 }
583 }
584 request.account_id.clone()
585}
586
587fn latest_active_revision(
588 revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
589) -> Option<&TaskDefinition> {
590 revisions.values().rev().find(|td| td.status == "ACTIVE")
591}
592
593impl EcsService {
596 fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
597 let body = request.json_body();
598 let cluster_name = opt_str(&body, "clusterName")
599 .unwrap_or("default")
600 .to_string();
601 let tags = parse_tags(&body);
602 let settings: Vec<Value> = body
603 .get("settings")
604 .and_then(|v| v.as_array())
605 .cloned()
606 .unwrap_or_default();
607 let configuration = body.get("configuration").cloned();
608 let capacity_providers: Vec<String> = body
609 .get("capacityProviders")
610 .and_then(|v| v.as_array())
611 .map(|arr| {
612 arr.iter()
613 .filter_map(|v| v.as_str().map(String::from))
614 .collect()
615 })
616 .unwrap_or_default();
617 let default_strategy: Vec<Value> = body
618 .get("defaultCapacityProviderStrategy")
619 .and_then(|v| v.as_array())
620 .cloned()
621 .unwrap_or_default();
622 let service_connect = body.get("serviceConnectDefaults").cloned();
623
624 let account = request.account_id.clone();
625 let mut accounts = self.state.write();
626 let state = accounts.get_or_create(&account);
627 let arn = state.cluster_arn(&cluster_name);
628 let mut cluster = Cluster::new(&cluster_name, arn);
629 cluster.tags = tags;
630 cluster.settings = settings;
631 cluster.configuration = configuration;
632 cluster.capacity_providers = capacity_providers;
633 cluster.default_capacity_provider_strategy = default_strategy;
634 cluster.service_connect_defaults = service_connect;
635 state.clusters.insert(cluster_name.clone(), cluster.clone());
639
640 Ok(AwsResponse::ok_json(json!({
641 "cluster": cluster_to_json(&cluster),
642 })))
643 }
644
645 fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
646 let body = request.json_body();
647 let names: Vec<String> = body
648 .get("clusters")
649 .and_then(|v| v.as_array())
650 .map(|arr| {
651 arr.iter()
652 .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
653 .collect()
654 })
655 .unwrap_or_else(|| vec!["default".to_string()]);
656
657 let account = request.account_id.clone();
658 let accounts = self.state.read();
659 let mut found = Vec::new();
660 let mut failures = Vec::new();
661 if let Some(state) = accounts.get(&account) {
662 for name in &names {
663 match state.clusters.get(name) {
664 Some(c) => found.push(cluster_to_json(c)),
665 None => failures.push(json!({
666 "arn": state.cluster_arn(name),
667 "reason": "MISSING",
668 })),
669 }
670 }
671 } else {
672 for name in &names {
673 failures.push(json!({
674 "arn": format!(
675 "arn:aws:ecs:{}:{}:cluster/{}",
676 accounts.region(),
677 account,
678 name
679 ),
680 "reason": "MISSING",
681 }));
682 }
683 }
684 Ok(AwsResponse::ok_json(json!({
685 "clusters": found,
686 "failures": failures,
687 })))
688 }
689
690 fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
691 let body = request.json_body();
692 let cluster_ref = opt_str(&body, "cluster");
693 let name = EcsState::resolve_cluster_name(cluster_ref);
694 let account = target_account_for_cluster(request, cluster_ref);
695
696 let mut accounts = self.state.write();
697 let state = accounts.get_or_create(&account);
698 let cluster = state
699 .clusters
700 .get_mut(&name)
701 .ok_or_else(|| cluster_not_found(&name))?;
702 if cluster.active_services_count > 0 {
703 return Err(cluster_contains_services());
704 }
705 if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
706 return Err(cluster_contains_tasks());
707 }
708 cluster.status = "INACTIVE".to_string();
709 let snapshot = cluster.clone();
710 state.clusters.remove(&name);
715 Ok(AwsResponse::ok_json(json!({
716 "cluster": cluster_to_json(&snapshot),
717 })))
718 }
719
720 fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
721 let body = request.json_body();
722 let max_results = body
723 .get("maxResults")
724 .and_then(|v| v.as_i64())
725 .filter(|n| (1..=100).contains(n))
726 .map(|n| n as usize)
727 .unwrap_or(100);
728 let next_token = opt_str(&body, "nextToken").unwrap_or("");
729
730 let account = request.account_id.clone();
731 let accounts = self.state.read();
732 let arns: Vec<String> = match accounts.get(&account) {
733 Some(state) => state
734 .clusters
735 .values()
736 .map(|c| c.cluster_arn.clone())
737 .collect(),
738 None => Vec::new(),
739 };
740 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
741 let end = (start + max_results).min(arns.len());
742 let page = arns[start..end].to_vec();
743 let next = if end < arns.len() {
744 Some(end.to_string())
745 } else {
746 None
747 };
748 let mut out = json!({ "clusterArns": page });
749 if let Some(n) = next {
750 out.as_object_mut()
751 .unwrap()
752 .insert("nextToken".into(), json!(n));
753 }
754 Ok(AwsResponse::ok_json(out))
755 }
756
757 fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
758 let body = request.json_body();
759 let cluster_ref = req_str(&body, "cluster")?;
760 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
761 let account = target_account_for_cluster(request, Some(cluster_ref));
762
763 let mut accounts = self.state.write();
764 let state = accounts.get_or_create(&account);
765 let cluster = state
766 .clusters
767 .get_mut(&name)
768 .ok_or_else(|| cluster_not_found(&name))?;
769 if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
770 cluster.settings = settings.clone();
771 }
772 if let Some(cfg) = body.get("configuration") {
773 cluster.configuration = Some(cfg.clone());
774 }
775 if let Some(sc) = body.get("serviceConnectDefaults") {
776 cluster.service_connect_defaults = Some(sc.clone());
777 }
778 let snapshot = cluster.clone();
779 Ok(AwsResponse::ok_json(json!({
780 "cluster": cluster_to_json(&snapshot),
781 })))
782 }
783
784 fn update_cluster_settings(
785 &self,
786 request: &AwsRequest,
787 ) -> Result<AwsResponse, AwsServiceError> {
788 let body = request.json_body();
789 let cluster_ref = req_str(&body, "cluster")?;
790 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
791 let account = target_account_for_cluster(request, Some(cluster_ref));
792 let settings: Vec<Value> = body
793 .get("settings")
794 .and_then(|v| v.as_array())
795 .cloned()
796 .unwrap_or_default();
797
798 let mut accounts = self.state.write();
799 let state = accounts.get_or_create(&account);
800 let cluster = state
801 .clusters
802 .get_mut(&name)
803 .ok_or_else(|| cluster_not_found(&name))?;
804 cluster.settings = settings;
805 let snapshot = cluster.clone();
806 Ok(AwsResponse::ok_json(json!({
807 "cluster": cluster_to_json(&snapshot),
808 })))
809 }
810
811 fn put_cluster_capacity_providers(
812 &self,
813 request: &AwsRequest,
814 ) -> Result<AwsResponse, AwsServiceError> {
815 let body = request.json_body();
816 let cluster_ref = req_str(&body, "cluster")?;
817 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
818 let account = target_account_for_cluster(request, Some(cluster_ref));
819 let capacity_providers: Vec<String> = body
820 .get("capacityProviders")
821 .and_then(|v| v.as_array())
822 .map(|arr| {
823 arr.iter()
824 .filter_map(|v| v.as_str().map(String::from))
825 .collect()
826 })
827 .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
828 let default_strategy: Vec<Value> = body
829 .get("defaultCapacityProviderStrategy")
830 .and_then(|v| v.as_array())
831 .cloned()
832 .ok_or_else(|| {
833 client_exception("Missing required field: defaultCapacityProviderStrategy")
834 })?;
835
836 let mut accounts = self.state.write();
837 let state = accounts.get_or_create(&account);
838 let cluster = state
839 .clusters
840 .get_mut(&name)
841 .ok_or_else(|| cluster_not_found(&name))?;
842 cluster.capacity_providers = capacity_providers;
843 cluster.default_capacity_provider_strategy = default_strategy;
844 let snapshot = cluster.clone();
845 Ok(AwsResponse::ok_json(json!({
846 "cluster": cluster_to_json(&snapshot),
847 })))
848 }
849}
850
851impl EcsService {
854 fn register_task_definition(
855 &self,
856 request: &AwsRequest,
857 ) -> Result<AwsResponse, AwsServiceError> {
858 let body = request.json_body();
859 let family = req_str(&body, "family")?.to_string();
860 validate_family_name(&family)?;
861 let container_definitions = body
862 .get("containerDefinitions")
863 .and_then(|v| v.as_array())
864 .cloned()
865 .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
866 if container_definitions.is_empty() {
867 return Err(client_exception(
868 "Task definition must have at least one container",
869 ));
870 }
871 for cd in &container_definitions {
872 if cd
873 .get("name")
874 .and_then(|v| v.as_str())
875 .unwrap_or("")
876 .is_empty()
877 {
878 return Err(client_exception(
879 "Container definition is missing required field: name",
880 ));
881 }
882 if cd
883 .get("image")
884 .and_then(|v| v.as_str())
885 .unwrap_or("")
886 .is_empty()
887 {
888 return Err(client_exception(
889 "Container definition is missing required field: image",
890 ));
891 }
892 }
893 if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
897 self.check_pass_role(&request.account_id, role_arn)?;
898 }
899 if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
900 self.check_pass_role(&request.account_id, role_arn)?;
901 }
902
903 let tags = parse_tags(&body);
904 let requires_compatibilities: Vec<String> = body
905 .get("requiresCompatibilities")
906 .and_then(|v| v.as_array())
907 .map(|arr| {
908 arr.iter()
909 .filter_map(|v| v.as_str().map(String::from))
910 .collect()
911 })
912 .unwrap_or_default();
913 let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
917
918 let account = request.account_id.clone();
919 let mut accounts = self.state.write();
920 let state = accounts.get_or_create(&account);
921 let revision = state.allocate_revision(&family);
922 let arn = state.task_definition_arn(&family, revision);
923 let td = TaskDefinition {
924 family: family.clone(),
925 revision,
926 task_definition_arn: arn,
927 container_definitions,
928 status: "ACTIVE".to_string(),
929 task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
930 execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
931 network_mode: opt_str(&body, "networkMode").map(String::from),
932 requires_compatibilities,
933 compatibilities,
934 cpu: opt_str(&body, "cpu").map(String::from),
935 memory: opt_str(&body, "memory").map(String::from),
936 pid_mode: opt_str(&body, "pidMode").map(String::from),
937 ipc_mode: opt_str(&body, "ipcMode").map(String::from),
938 volumes: body
939 .get("volumes")
940 .and_then(|v| v.as_array())
941 .cloned()
942 .unwrap_or_default(),
943 placement_constraints: body
944 .get("placementConstraints")
945 .and_then(|v| v.as_array())
946 .cloned()
947 .unwrap_or_default(),
948 proxy_configuration: body.get("proxyConfiguration").cloned(),
949 inference_accelerators: body
950 .get("inferenceAccelerators")
951 .and_then(|v| v.as_array())
952 .cloned()
953 .unwrap_or_default(),
954 ephemeral_storage: body.get("ephemeralStorage").cloned(),
955 runtime_platform: body.get("runtimePlatform").cloned(),
956 requires_attributes: Vec::new(),
957 registered_at: Utc::now(),
958 registered_by: request.principal.as_ref().map(|p| p.arn.clone()).or(Some(
959 Arn::global("iam", &state.account_id, "root").to_string(),
960 )),
961 deregistered_at: None,
962 tags: tags.clone(),
963 enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
964 };
965 let td_json = task_definition_to_json(&td);
966 state
967 .task_definitions
968 .entry(family.clone())
969 .or_default()
970 .insert(revision, td);
971
972 Ok(AwsResponse::ok_json(json!({
973 "taskDefinition": td_json,
974 "tags": tags_json(&tags),
975 })))
976 }
977
978 fn describe_task_definition(
979 &self,
980 request: &AwsRequest,
981 ) -> Result<AwsResponse, AwsServiceError> {
982 let body = request.json_body();
983 let td_ref = req_str(&body, "taskDefinition")?;
984 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
985 let include_tags = body
986 .get("include")
987 .and_then(|v| v.as_array())
988 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
989 .unwrap_or(false);
990
991 let account = target_account_for_task_definition(request, td_ref);
992 let accounts = self.state.read();
993 let state = accounts
994 .get(&account)
995 .ok_or_else(|| task_definition_not_found(td_ref))?;
996 let revisions = state
997 .task_definitions
998 .get(&family)
999 .ok_or_else(|| task_definition_not_found(td_ref))?;
1000 let td = match rev {
1001 Some(n) => revisions
1002 .get(&n)
1003 .ok_or_else(|| task_definition_not_found(td_ref))?,
1004 None => latest_active_revision(revisions)
1005 .ok_or_else(|| task_definition_not_found(td_ref))?,
1006 };
1007 let mut out = json!({"taskDefinition": task_definition_to_json(td)});
1008 if include_tags {
1009 out.as_object_mut()
1010 .unwrap()
1011 .insert("tags".into(), tags_json(&td.tags));
1012 }
1013 Ok(AwsResponse::ok_json(out))
1014 }
1015
1016 fn deregister_task_definition(
1017 &self,
1018 request: &AwsRequest,
1019 ) -> Result<AwsResponse, AwsServiceError> {
1020 let body = request.json_body();
1021 let td_ref = req_str(&body, "taskDefinition")?;
1022 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1023 let rev =
1024 rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
1025
1026 let account = target_account_for_task_definition(request, td_ref);
1027 let mut accounts = self.state.write();
1028 let state = accounts.get_or_create(&account);
1029 let revisions = state
1030 .task_definitions
1031 .get_mut(&family)
1032 .ok_or_else(|| task_definition_not_found(td_ref))?;
1033 let td = revisions
1034 .get_mut(&rev)
1035 .ok_or_else(|| task_definition_not_found(td_ref))?;
1036 td.status = "INACTIVE".to_string();
1037 td.deregistered_at = Some(Utc::now());
1038 let snapshot = td.clone();
1039 Ok(AwsResponse::ok_json(json!({
1040 "taskDefinition": task_definition_to_json(&snapshot),
1041 })))
1042 }
1043
1044 fn delete_task_definitions(
1045 &self,
1046 request: &AwsRequest,
1047 ) -> Result<AwsResponse, AwsServiceError> {
1048 let body = request.json_body();
1049 let refs: Vec<String> = body
1050 .get("taskDefinitions")
1051 .and_then(|v| v.as_array())
1052 .map(|arr| {
1053 arr.iter()
1054 .filter_map(|v| v.as_str().map(String::from))
1055 .collect()
1056 })
1057 .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1058 if refs.is_empty() {
1059 return Err(client_exception("taskDefinitions must not be empty"));
1060 }
1061
1062 let mut deleted = Vec::new();
1063 let mut failures = Vec::new();
1064 let account = request.account_id.clone();
1065 let mut accounts = self.state.write();
1066 let state = accounts.get_or_create(&account);
1067 for input in refs {
1068 let parsed = match resolve_task_definition_ref(&input) {
1069 Ok((_, family, Some(rev))) => Some((family, rev)),
1070 Ok(_) => None,
1071 Err(_) => None,
1072 };
1073 let Some((family, rev)) = parsed else {
1074 failures.push(json!({
1075 "arn": input,
1076 "reason": "INVALID_REFERENCE",
1077 "detail": "Expected family:revision or full task-definition ARN",
1078 }));
1079 continue;
1080 };
1081 let Some(revisions) = state.task_definitions.get_mut(&family) else {
1082 failures.push(json!({"arn": input, "reason": "MISSING"}));
1083 continue;
1084 };
1085 let Some(td) = revisions.get_mut(&rev) else {
1086 failures.push(json!({"arn": input, "reason": "MISSING"}));
1087 continue;
1088 };
1089 if td.status == "ACTIVE" {
1090 failures.push(json!({
1091 "arn": td.task_definition_arn.clone(),
1092 "reason": "MUST_BE_INACTIVE",
1093 "detail": "Task definitions must be deregistered before they can be deleted",
1094 }));
1095 continue;
1096 }
1097 td.status = "DELETE_IN_PROGRESS".to_string();
1098 deleted.push(task_definition_to_json(td));
1099 }
1100 Ok(AwsResponse::ok_json(json!({
1101 "taskDefinitions": deleted,
1102 "failures": failures,
1103 })))
1104 }
1105
1106 fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1107 let body = request.json_body();
1108 let family_prefix = opt_str(&body, "familyPrefix");
1109 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1110 let sort = opt_str(&body, "sort").unwrap_or("ASC");
1111 let max_results = body
1112 .get("maxResults")
1113 .and_then(|v| v.as_i64())
1114 .filter(|n| (1..=100).contains(n))
1115 .map(|n| n as usize)
1116 .unwrap_or(100);
1117 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1118
1119 let account = request.account_id.clone();
1120 let accounts = self.state.read();
1121 let mut arns: Vec<String> = Vec::new();
1122 if let Some(state) = accounts.get(&account) {
1123 for (family, revisions) in &state.task_definitions {
1124 if let Some(prefix) = family_prefix {
1125 if !family.starts_with(prefix) {
1126 continue;
1127 }
1128 }
1129 for td in revisions.values() {
1130 if td.status == status {
1131 arns.push(td.task_definition_arn.clone());
1132 }
1133 }
1134 }
1135 }
1136 if sort == "DESC" {
1137 arns.sort();
1138 arns.reverse();
1139 } else {
1140 arns.sort();
1141 }
1142 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1143 let end = (start + max_results).min(arns.len());
1144 let page = arns[start..end].to_vec();
1145 let mut out = json!({"taskDefinitionArns": page});
1146 if end < arns.len() {
1147 out.as_object_mut()
1148 .unwrap()
1149 .insert("nextToken".into(), json!(end.to_string()));
1150 }
1151 Ok(AwsResponse::ok_json(out))
1152 }
1153
1154 fn list_task_definition_families(
1155 &self,
1156 request: &AwsRequest,
1157 ) -> Result<AwsResponse, AwsServiceError> {
1158 let body = request.json_body();
1159 let family_prefix = opt_str(&body, "familyPrefix");
1160 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1161 let max_results = body
1162 .get("maxResults")
1163 .and_then(|v| v.as_i64())
1164 .filter(|n| (1..=100).contains(n))
1165 .map(|n| n as usize)
1166 .unwrap_or(100);
1167 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1168
1169 let account = request.account_id.clone();
1170 let accounts = self.state.read();
1171 let mut families: Vec<String> = Vec::new();
1172 if let Some(state) = accounts.get(&account) {
1173 for (family, revisions) in &state.task_definitions {
1174 if let Some(prefix) = family_prefix {
1175 if !family.starts_with(prefix) {
1176 continue;
1177 }
1178 }
1179 let matches_status = match status {
1180 "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1181 "INACTIVE" => revisions
1182 .values()
1183 .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1184 "ALL" => true,
1185 _ => revisions.values().any(|td| td.status == status),
1186 };
1187 if matches_status {
1188 families.push(family.clone());
1189 }
1190 }
1191 }
1192 families.sort();
1193 let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1194 let end = (start + max_results).min(families.len());
1195 let page = families[start..end].to_vec();
1196 let mut out = json!({"families": page});
1197 if end < families.len() {
1198 out.as_object_mut()
1199 .unwrap()
1200 .insert("nextToken".into(), json!(end.to_string()));
1201 }
1202 Ok(AwsResponse::ok_json(out))
1203 }
1204}
1205
1206fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1207 if family.is_empty() || family.len() > 255 {
1208 return Err(invalid_parameter(
1209 "Task definition family must be 1-255 characters",
1210 ));
1211 }
1212 let ok = family
1213 .chars()
1214 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1215 if !ok {
1216 return Err(invalid_parameter(
1217 "Task definition family may only contain letters, numbers, hyphens, and underscores",
1218 ));
1219 }
1220 Ok(())
1221}
1222
1223impl EcsService {
1226 fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1227 let body = request.json_body();
1228 let arn = req_str(&body, "resourceArn")?.to_string();
1229 let tags = parse_tags(&body);
1230 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1231 let mut accounts = self.state.write();
1232 let state = accounts.get_or_create(&account);
1233 match resource_type.as_str() {
1234 "cluster" => {
1235 let cluster = state
1236 .clusters
1237 .get_mut(&tail)
1238 .ok_or_else(|| resource_not_found(&arn))?;
1239 merge_tags(&mut cluster.tags, tags);
1240 }
1241 "task-definition" => {
1242 let (family, rev) = parse_family_revision(&tail);
1243 let rev = rev.ok_or_else(|| {
1244 invalid_parameter("task-definition ARN must include revision")
1245 })?;
1246 let td = state
1247 .task_definitions
1248 .get_mut(&family)
1249 .and_then(|m| m.get_mut(&rev))
1250 .ok_or_else(|| resource_not_found(&arn))?;
1251 merge_tags(&mut td.tags, tags);
1252 }
1253 "service" => {
1254 let key =
1255 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1256 let svc = state.services.get_mut(&key).expect("resolved key exists");
1257 merge_tags(&mut svc.tags, tags);
1258 }
1259 "task" => {
1260 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1261 let task = state
1262 .tasks
1263 .get_mut(&task_id)
1264 .ok_or_else(|| resource_not_found(&arn))?;
1265 merge_tags(&mut task.tags, tags);
1266 }
1267 "task-set" => {
1268 let ts = state
1269 .task_sets
1270 .get_mut(&tail)
1271 .ok_or_else(|| resource_not_found(&arn))?;
1272 merge_tags(&mut ts.tags, tags);
1273 }
1274 "container-instance" => {
1275 let key = resolve_container_instance_key(state, &tail)
1276 .ok_or_else(|| resource_not_found(&arn))?;
1277 let ci = state
1278 .container_instances
1279 .get_mut(&key)
1280 .expect("resolved key exists");
1281 merge_tags(&mut ci.tags, tags);
1282 }
1283 "capacity-provider" => {
1284 let cp = state
1285 .capacity_providers
1286 .get_mut(&tail)
1287 .ok_or_else(|| resource_not_found(&arn))?;
1288 merge_tags(&mut cp.tags, tags);
1289 }
1290 other => {
1291 return Err(invalid_parameter(format!(
1292 "Unknown ECS resource type: {other}"
1293 )));
1294 }
1295 }
1296 Ok(AwsResponse::ok_json(json!({})))
1297 }
1298
1299 fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1300 let body = request.json_body();
1301 let arn = req_str(&body, "resourceArn")?.to_string();
1302 let keys: Vec<String> = body
1303 .get("tagKeys")
1304 .and_then(|v| v.as_array())
1305 .map(|arr| {
1306 arr.iter()
1307 .filter_map(|v| v.as_str().map(String::from))
1308 .collect()
1309 })
1310 .unwrap_or_default();
1311 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1312 let mut accounts = self.state.write();
1313 let state = accounts.get_or_create(&account);
1314 match resource_type.as_str() {
1315 "cluster" => {
1316 let cluster = state
1317 .clusters
1318 .get_mut(&tail)
1319 .ok_or_else(|| resource_not_found(&arn))?;
1320 cluster.tags.retain(|t| !keys.contains(&t.key));
1321 }
1322 "task-definition" => {
1323 let (family, rev) = parse_family_revision(&tail);
1324 let rev = rev.ok_or_else(|| {
1325 invalid_parameter("task-definition ARN must include revision")
1326 })?;
1327 let td = state
1328 .task_definitions
1329 .get_mut(&family)
1330 .and_then(|m| m.get_mut(&rev))
1331 .ok_or_else(|| resource_not_found(&arn))?;
1332 td.tags.retain(|t| !keys.contains(&t.key));
1333 }
1334 "service" => {
1335 let key =
1336 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1337 let svc = state.services.get_mut(&key).expect("resolved key exists");
1338 svc.tags.retain(|t| !keys.contains(&t.key));
1339 }
1340 "task" => {
1341 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1342 let task = state
1343 .tasks
1344 .get_mut(&task_id)
1345 .ok_or_else(|| resource_not_found(&arn))?;
1346 task.tags.retain(|t| !keys.contains(&t.key));
1347 }
1348 "task-set" => {
1349 let ts = state
1350 .task_sets
1351 .get_mut(&tail)
1352 .ok_or_else(|| resource_not_found(&arn))?;
1353 ts.tags.retain(|t| !keys.contains(&t.key));
1354 }
1355 "container-instance" => {
1356 let key = resolve_container_instance_key(state, &tail)
1357 .ok_or_else(|| resource_not_found(&arn))?;
1358 let ci = state
1359 .container_instances
1360 .get_mut(&key)
1361 .expect("resolved key exists");
1362 ci.tags.retain(|t| !keys.contains(&t.key));
1363 }
1364 "capacity-provider" => {
1365 let cp = state
1366 .capacity_providers
1367 .get_mut(&tail)
1368 .ok_or_else(|| resource_not_found(&arn))?;
1369 cp.tags.retain(|t| !keys.contains(&t.key));
1370 }
1371 other => {
1372 return Err(invalid_parameter(format!(
1373 "Unknown ECS resource type: {other}"
1374 )));
1375 }
1376 }
1377 Ok(AwsResponse::ok_json(json!({})))
1378 }
1379
1380 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1381 let body = request.json_body();
1382 let arn = req_str(&body, "resourceArn")?.to_string();
1383 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1384 let accounts = self.state.read();
1385 let state = accounts
1386 .get(&account)
1387 .ok_or_else(|| resource_not_found(&arn))?;
1388 let tags = match resource_type.as_str() {
1389 "cluster" => state
1390 .clusters
1391 .get(&tail)
1392 .map(|c| c.tags.clone())
1393 .ok_or_else(|| resource_not_found(&arn))?,
1394 "task-definition" => {
1395 let (family, rev) = parse_family_revision(&tail);
1396 let rev = rev.ok_or_else(|| {
1397 invalid_parameter("task-definition ARN must include revision")
1398 })?;
1399 state
1400 .task_definitions
1401 .get(&family)
1402 .and_then(|m| m.get(&rev))
1403 .map(|td| td.tags.clone())
1404 .ok_or_else(|| resource_not_found(&arn))?
1405 }
1406 "service" => {
1407 let key =
1408 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1409 state
1410 .services
1411 .get(&key)
1412 .map(|s| s.tags.clone())
1413 .expect("resolved key exists")
1414 }
1415 "task" => {
1416 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1417 state
1418 .tasks
1419 .get(&task_id)
1420 .map(|t| t.tags.clone())
1421 .ok_or_else(|| resource_not_found(&arn))?
1422 }
1423 "task-set" => state
1424 .task_sets
1425 .get(&tail)
1426 .map(|t| t.tags.clone())
1427 .ok_or_else(|| resource_not_found(&arn))?,
1428 "container-instance" => {
1429 let key = resolve_container_instance_key(state, &tail)
1430 .ok_or_else(|| resource_not_found(&arn))?;
1431 state
1432 .container_instances
1433 .get(&key)
1434 .map(|c| c.tags.clone())
1435 .expect("resolved key exists")
1436 }
1437 "capacity-provider" => state
1438 .capacity_providers
1439 .get(&tail)
1440 .map(|c| c.tags.clone())
1441 .ok_or_else(|| resource_not_found(&arn))?,
1442 other => {
1443 return Err(invalid_parameter(format!(
1444 "Unknown ECS resource type: {other}"
1445 )));
1446 }
1447 };
1448 Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1449 }
1450}
1451
1452fn resource_not_found(arn: &str) -> AwsServiceError {
1453 AwsServiceError::aws_error(
1454 StatusCode::BAD_REQUEST,
1455 "ClientException",
1456 format!("The referenced resource was not found: {arn}"),
1457 )
1458}
1459
1460impl EcsService {
1463 fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1464 let body = request.json_body();
1465 let name = req_str(&body, "name")?.to_string();
1466 let value = req_str(&body, "value")?.to_string();
1467 let principal_arn = opt_str(&body, "principalArn")
1468 .map(String::from)
1469 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1470 .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
1471 let account = request.account_id.clone();
1472 let mut accounts = self.state.write();
1473 let state = accounts.get_or_create(&account);
1474 state
1475 .principal_account_settings
1476 .entry(principal_arn.clone())
1477 .or_default()
1478 .insert(name.clone(), value.clone());
1479 Ok(AwsResponse::ok_json(json!({
1480 "setting": {
1481 "name": name,
1482 "value": value,
1483 "principalArn": principal_arn,
1484 }
1485 })))
1486 }
1487
1488 fn put_account_setting_default(
1489 &self,
1490 request: &AwsRequest,
1491 ) -> Result<AwsResponse, AwsServiceError> {
1492 let body = request.json_body();
1493 let name = req_str(&body, "name")?.to_string();
1494 let value = req_str(&body, "value")?.to_string();
1495 let account = request.account_id.clone();
1496 let mut accounts = self.state.write();
1497 let state = accounts.get_or_create(&account);
1498 state
1499 .account_setting_defaults
1500 .insert(name.clone(), value.clone());
1501 Ok(AwsResponse::ok_json(json!({
1502 "setting": {
1503 "name": name,
1504 "value": value,
1505 "principalArn": Arn::global("iam", &state.account_id, "root").to_string(),
1506 }
1507 })))
1508 }
1509
1510 fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1511 let body = request.json_body();
1512 let name = req_str(&body, "name")?.to_string();
1513 let principal_arn = opt_str(&body, "principalArn")
1514 .map(String::from)
1515 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1516 .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
1517 let account = request.account_id.clone();
1518 let mut accounts = self.state.write();
1519 let state = accounts.get_or_create(&account);
1520 let removed_value = state
1521 .principal_account_settings
1522 .get_mut(&principal_arn)
1523 .and_then(|m| m.remove(&name));
1524 Ok(AwsResponse::ok_json(json!({
1525 "setting": {
1526 "name": name,
1527 "value": removed_value.unwrap_or_default(),
1528 "principalArn": principal_arn,
1529 }
1530 })))
1531 }
1532
1533 fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1534 let body = request.json_body();
1535 let name_filter = opt_str(&body, "name");
1536 let value_filter = opt_str(&body, "value");
1537 let principal_filter = opt_str(&body, "principalArn");
1538 let effective_only = body
1539 .get("effectiveSettings")
1540 .and_then(|v| v.as_bool())
1541 .unwrap_or(false);
1542
1543 let account = request.account_id.clone();
1544 let accounts = self.state.read();
1545 let Some(state) = accounts.get(&account) else {
1546 return Ok(AwsResponse::ok_json(json!({"settings": []})));
1547 };
1548 let root_arn = Arn::global("iam", &state.account_id, "root").to_string();
1549 let mut settings: Vec<Value> = Vec::new();
1550
1551 if effective_only {
1552 let principal = principal_filter
1555 .map(String::from)
1556 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1557 .unwrap_or_else(|| root_arn.clone());
1558 let mut merged = state.account_setting_defaults.clone();
1559 if let Some(overrides) = state.principal_account_settings.get(&principal) {
1560 for (k, v) in overrides {
1561 merged.insert(k.clone(), v.clone());
1562 }
1563 }
1564 for (k, v) in merged {
1565 if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1566 settings.push(json!({
1567 "name": k,
1568 "value": v,
1569 "principalArn": principal,
1570 }));
1571 }
1572 }
1573 } else {
1574 for (k, v) in &state.account_setting_defaults {
1577 if matches_filter(name_filter, k)
1578 && matches_filter(value_filter, v)
1579 && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1580 {
1581 settings.push(json!({
1582 "name": k,
1583 "value": v,
1584 "principalArn": root_arn,
1585 }));
1586 }
1587 }
1588 for (principal, entries) in &state.principal_account_settings {
1589 if principal_filter.is_some_and(|pf| pf != principal) {
1590 continue;
1591 }
1592 for (k, v) in entries {
1593 if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1594 settings.push(json!({
1595 "name": k,
1596 "value": v,
1597 "principalArn": principal,
1598 }));
1599 }
1600 }
1601 }
1602 }
1603
1604 Ok(AwsResponse::ok_json(json!({"settings": settings})))
1605 }
1606}
1607
1608fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1609 filter.is_none_or(|f| f == value)
1610}
1611
1612impl EcsService {
1615 fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1616 let body = request.json_body();
1617 let td_ref = req_str(&body, "taskDefinition")?;
1618 let cluster_ref = opt_str(&body, "cluster");
1619 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1620 let launch_type = opt_str(&body, "launchType")
1621 .unwrap_or("FARGATE")
1622 .to_string();
1623 let count = body
1624 .get("count")
1625 .and_then(|v| v.as_i64())
1626 .filter(|n| (1..=10).contains(n))
1627 .unwrap_or(1) as usize;
1628 let group = opt_str(&body, "group").map(String::from);
1629 let started_by = opt_str(&body, "startedBy").map(String::from);
1630 let tags = parse_tags(&body);
1631
1632 if let Some(overrides) = body.get("overrides") {
1638 if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1639 self.check_pass_role(&request.account_id, role_arn)?;
1640 }
1641 if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1642 self.check_pass_role(&request.account_id, role_arn)?;
1643 }
1644 }
1645
1646 let account = request.account_id.clone();
1647 let runtime = self.runtime.clone();
1648 let mut accounts = self.state.write();
1649 let state = accounts.get_or_create(&account);
1650 let cluster_arn = state
1651 .clusters
1652 .get(&cluster_name)
1653 .map(|c| c.cluster_arn.clone())
1654 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1655 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1656 let revisions = state
1657 .task_definitions
1658 .get(&family)
1659 .ok_or_else(|| task_definition_not_found(td_ref))?;
1660 let td = match rev {
1661 Some(n) => revisions
1662 .get(&n)
1663 .ok_or_else(|| task_definition_not_found(td_ref))?,
1664 None => latest_active_revision(revisions)
1665 .ok_or_else(|| task_definition_not_found(td_ref))?,
1666 };
1667 if td.status != "ACTIVE" {
1668 return Err(client_exception(format!(
1669 "Task definition {} is not ACTIVE",
1670 td.task_definition_arn
1671 )));
1672 }
1673 let td_arn = td.task_definition_arn.clone();
1674 let td_family = td.family.clone();
1675 let td_revision = td.revision;
1676 let td_cpu = td.cpu.clone();
1677 let td_memory = td.memory.clone();
1678 let td_task_role = td.task_role_arn.clone();
1679 let td_exec_role = td.execution_role_arn.clone();
1680 let td_containers = td.container_definitions.clone();
1681
1682 let mut spawned_tasks: Vec<String> = Vec::new();
1683 let mut task_jsons: Vec<Value> = Vec::new();
1684 for _ in 0..count {
1685 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1686 let task_arn = state.task_arn(&cluster_name, &task_id);
1687 let containers: Vec<Container> = td_containers
1688 .iter()
1689 .map(|def| Container {
1690 container_arn: format!(
1691 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1692 state.region,
1693 state.account_id,
1694 cluster_name,
1695 task_id,
1696 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1697 ),
1698 name: def
1699 .get("name")
1700 .and_then(|v| v.as_str())
1701 .unwrap_or("app")
1702 .to_string(),
1703 image: def
1704 .get("image")
1705 .and_then(|v| v.as_str())
1706 .unwrap_or("")
1707 .to_string(),
1708 task_arn: task_arn.clone(),
1709 last_status: "PENDING".into(),
1710 exit_code: None,
1711 reason: None,
1712 runtime_id: None,
1713 essential: def
1714 .get("essential")
1715 .and_then(|v| v.as_bool())
1716 .unwrap_or(true),
1717 cpu: def
1718 .get("cpu")
1719 .and_then(|v| v.as_i64())
1720 .map(|n| n.to_string()),
1721 memory: def
1722 .get("memory")
1723 .and_then(|v| v.as_i64())
1724 .map(|n| n.to_string()),
1725 memory_reservation: def
1726 .get("memoryReservation")
1727 .and_then(|v| v.as_i64())
1728 .map(|n| n.to_string()),
1729 network_bindings: Vec::new(),
1730 network_interfaces: Vec::new(),
1731 health_status: Some("UNKNOWN".to_string()),
1732 managed_agents: None,
1733 })
1734 .collect();
1735 let awslogs = td_containers.iter().find_map(|def| {
1736 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1737 let log_cfg = def.get("logConfiguration")?;
1738 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1739 return None;
1740 }
1741 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1742 Some(AwsLogsConfig {
1743 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1744 stream_prefix: opts
1745 .get("awslogs-stream-prefix")
1746 .and_then(|v| v.as_str())
1747 .map(String::from),
1748 region: opts
1749 .get("awslogs-region")
1750 .and_then(|v| v.as_str())
1751 .unwrap_or(&state.region)
1752 .to_string(),
1753 container_name: name,
1754 })
1755 });
1756 let task = Task {
1757 task_arn: task_arn.clone(),
1758 task_id: task_id.clone(),
1759 cluster_arn: cluster_arn.clone(),
1760 cluster_name: cluster_name.clone(),
1761 task_definition_arn: td_arn.clone(),
1762 family: td_family.clone(),
1763 revision: td_revision,
1764 last_status: "PROVISIONING".into(),
1765 desired_status: "RUNNING".into(),
1766 launch_type: launch_type.clone(),
1767 platform_version: Some("1.4.0".into()),
1768 cpu: body
1769 .get("overrides")
1770 .and_then(|v| v.get("cpu"))
1771 .and_then(|v| v.as_str())
1772 .map(String::from)
1773 .or_else(|| td_cpu.clone()),
1774 memory: body
1775 .get("overrides")
1776 .and_then(|v| v.get("memory"))
1777 .and_then(|v| v.as_str())
1778 .map(String::from)
1779 .or_else(|| td_memory.clone()),
1780 containers,
1781 overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1782 started_by: started_by.clone(),
1783 group: group.clone(),
1784 connectivity: "CONNECTING".into(),
1785 stop_code: None,
1786 stopped_reason: None,
1787 created_at: Utc::now(),
1788 started_at: None,
1789 stopping_at: None,
1790 stopped_at: None,
1791 pull_started_at: None,
1792 pull_stopped_at: None,
1793 connectivity_at: None,
1794 started_by_ref_id: None,
1795 execution_role_arn: td_exec_role.clone(),
1796 task_role_arn: td_task_role.clone(),
1797 tags: tags.clone(),
1798 awslogs,
1799 captured_logs: String::new(),
1800 protection: None,
1801 };
1802 state.tasks.insert(task_id.clone(), task.clone());
1803 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1804 cluster.pending_tasks_count += 1;
1805 }
1806 if let Some(t) = state.tasks.get_mut(&task_id) {
1810 t.last_status = "PENDING".into();
1811 }
1812 task_jsons.push(task_to_json(&task));
1813 spawned_tasks.push(task_id.clone());
1814 }
1815 drop(accounts);
1816
1817 if let Some(rt) = runtime {
1819 for id in &spawned_tasks {
1820 rt.clone()
1821 .run_task(self.state.clone(), id.clone(), account.clone());
1822 }
1823 } else {
1824 let mut accounts = self.state.write();
1829 if let Some(state) = accounts.get_mut(&account) {
1830 let mut cluster_drains: Vec<String> = Vec::new();
1831 for id in &spawned_tasks {
1832 if let Some(t) = state.tasks.get_mut(id) {
1833 t.last_status = "STOPPED".into();
1834 t.desired_status = "STOPPED".into();
1835 t.stop_code = Some("TaskFailedToStart".into());
1836 t.stopped_reason = Some(
1837 "No container runtime available (docker/podman not installed)".into(),
1838 );
1839 t.stopped_at = Some(Utc::now());
1840 for c in t.containers.iter_mut() {
1841 c.last_status = "STOPPED".into();
1842 }
1843 cluster_drains.push(t.cluster_name.clone());
1844 }
1845 }
1846 for name in cluster_drains {
1847 if let Some(cluster) = state.clusters.get_mut(&name) {
1848 if cluster.pending_tasks_count > 0 {
1849 cluster.pending_tasks_count -= 1;
1850 }
1851 }
1852 }
1853 }
1854 }
1855
1856 Ok(AwsResponse::ok_json(json!({
1857 "tasks": task_jsons,
1858 "failures": [],
1859 })))
1860 }
1861
1862 fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1863 self.run_task(request)
1868 }
1869
1870 async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1871 let body = request.json_body();
1872 let task_ref = req_str(&body, "task")?;
1873 let reason = opt_str(&body, "reason")
1874 .unwrap_or("UserInitiated")
1875 .to_string();
1876 let cluster_ref = opt_str(&body, "cluster");
1877 let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1878
1879 let (task_id, account, task_snapshot) = {
1880 let account = request.account_id.clone();
1881 let mut accounts = self.state.write();
1882 let state = accounts
1883 .get_mut(&account)
1884 .ok_or_else(|| task_not_found(task_ref))?;
1885 let task_id = resolve_task_id(state, task_ref)?;
1886 let task = state
1887 .tasks
1888 .get_mut(&task_id)
1889 .ok_or_else(|| task_not_found(task_ref))?;
1890 task.desired_status = "STOPPED".into();
1891 task.stopping_at = Some(Utc::now());
1892 task.stopped_reason = Some(reason.clone());
1893 task.stop_code = Some("UserInitiated".into());
1894 (task_id, account, task.clone())
1895 };
1896 if let Some(rt) = &self.runtime {
1897 rt.stop_task(&task_id, &reason).await;
1898 }
1899 let _ = account;
1900 Ok(AwsResponse::ok_json(json!({
1901 "task": task_to_json(&task_snapshot),
1902 })))
1903 }
1904
1905 fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1906 let body = request.json_body();
1907 let refs: Vec<String> = body
1908 .get("tasks")
1909 .and_then(|v| v.as_array())
1910 .map(|arr| {
1911 arr.iter()
1912 .filter_map(|v| v.as_str().map(String::from))
1913 .collect()
1914 })
1915 .unwrap_or_default();
1916 let include_tags = body
1917 .get("include")
1918 .and_then(|v| v.as_array())
1919 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1920 .unwrap_or(false);
1921
1922 let account = request.account_id.clone();
1923 let accounts = self.state.read();
1924 let Some(state) = accounts.get(&account) else {
1925 return Ok(AwsResponse::ok_json(json!({
1926 "tasks": [],
1927 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1928 })));
1929 };
1930 let mut found = Vec::new();
1931 let mut failures = Vec::new();
1932 for input in &refs {
1933 let task_id = task_id_from_ref(input);
1934 match state.tasks.get(&task_id) {
1935 Some(t) => {
1936 let mut v = task_to_json(t);
1937 if include_tags {
1938 v.as_object_mut()
1939 .unwrap()
1940 .insert("tags".into(), tags_json(&t.tags));
1941 }
1942 found.push(v);
1943 }
1944 None => {
1945 failures.push(json!({
1946 "arn": input,
1947 "reason": "MISSING",
1948 }));
1949 }
1950 }
1951 }
1952 Ok(AwsResponse::ok_json(json!({
1953 "tasks": found,
1954 "failures": failures,
1955 })))
1956 }
1957
1958 fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1959 let body = request.json_body();
1960 let cluster_ref = opt_str(&body, "cluster");
1961 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1962 let family = opt_str(&body, "family");
1963 let status_filter = opt_str(&body, "desiredStatus");
1964 let started_by = opt_str(&body, "startedBy");
1965 let max_results = body
1966 .get("maxResults")
1967 .and_then(|v| v.as_i64())
1968 .filter(|n| (1..=100).contains(n))
1969 .map(|n| n as usize)
1970 .unwrap_or(100);
1971 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1972
1973 let account = request.account_id.clone();
1974 let accounts = self.state.read();
1975 let mut arns: Vec<String> = match accounts.get(&account) {
1976 Some(state) => state
1977 .tasks
1978 .values()
1979 .filter(|t| t.cluster_name == cluster_name)
1980 .filter(|t| family.is_none_or(|f| t.family == f))
1981 .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1982 .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1983 .map(|t| t.task_arn.clone())
1984 .collect(),
1985 None => Vec::new(),
1986 };
1987 arns.sort();
1988 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1989 let end = (start + max_results).min(arns.len());
1990 let page = arns[start..end].to_vec();
1991 let mut out = json!({"taskArns": page});
1992 if end < arns.len() {
1993 out.as_object_mut()
1994 .unwrap()
1995 .insert("nextToken".into(), json!(end.to_string()));
1996 }
1997 Ok(AwsResponse::ok_json(out))
1998 }
1999}
2000
2001fn task_not_found(task_ref: &str) -> AwsServiceError {
2002 AwsServiceError::aws_error(
2003 StatusCode::BAD_REQUEST,
2004 "ClientException",
2005 format!("Task not found: {task_ref}"),
2006 )
2007}
2008
2009fn task_id_from_ref(input: &str) -> String {
2011 if let Some(rest) = input.rsplit('/').next() {
2012 return rest.to_string();
2013 }
2014 input.to_string()
2015}
2016
2017fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
2018 let id = task_id_from_ref(task_ref);
2019 if state.tasks.contains_key(&id) {
2020 Ok(id)
2021 } else {
2022 Err(task_not_found(task_ref))
2023 }
2024}
2025
2026fn task_to_json(task: &Task) -> Value {
2027 let mut map = serde_json::Map::new();
2028 map.insert("taskArn".into(), json!(task.task_arn));
2029 map.insert("clusterArn".into(), json!(task.cluster_arn));
2030 map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
2031 map.insert("lastStatus".into(), json!(task.last_status));
2032 map.insert("desiredStatus".into(), json!(task.desired_status));
2033 map.insert("launchType".into(), json!(task.launch_type));
2034 if let Some(ref v) = task.platform_version {
2035 map.insert("platformVersion".into(), json!(v));
2036 }
2037 if let Some(ref v) = task.cpu {
2038 map.insert("cpu".into(), json!(v));
2039 }
2040 if let Some(ref v) = task.memory {
2041 map.insert("memory".into(), json!(v));
2042 }
2043 map.insert(
2044 "containers".into(),
2045 Value::Array(task.containers.iter().map(container_to_json).collect()),
2046 );
2047 map.insert("overrides".into(), task.overrides.clone());
2048 if let Some(ref v) = task.started_by {
2049 map.insert("startedBy".into(), json!(v));
2050 }
2051 if let Some(ref v) = task.group {
2052 map.insert("group".into(), json!(v));
2053 }
2054 map.insert("connectivity".into(), json!(task.connectivity));
2055 if let Some(ref v) = task.stop_code {
2056 map.insert("stopCode".into(), json!(v));
2057 }
2058 if let Some(ref v) = task.stopped_reason {
2059 map.insert("stoppedReason".into(), json!(v));
2060 }
2061 if let Some(ref v) = task.task_role_arn {
2062 map.insert("taskRoleArn".into(), json!(v));
2063 }
2064 if let Some(ref v) = task.execution_role_arn {
2065 map.insert("executionRoleArn".into(), json!(v));
2066 }
2067 map.insert("createdAt".into(), json!(task.created_at.timestamp()));
2068 if let Some(ts) = task.started_at {
2069 map.insert("startedAt".into(), json!(ts.timestamp()));
2070 }
2071 if let Some(ts) = task.stopping_at {
2072 map.insert("stoppingAt".into(), json!(ts.timestamp()));
2073 }
2074 if let Some(ts) = task.stopped_at {
2075 map.insert("stoppedAt".into(), json!(ts.timestamp()));
2076 }
2077 if let Some(ts) = task.pull_started_at {
2078 map.insert("pullStartedAt".into(), json!(ts.timestamp()));
2079 }
2080 if let Some(ts) = task.pull_stopped_at {
2081 map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
2082 }
2083 if let Some(ts) = task.connectivity_at {
2084 map.insert("connectivityAt".into(), json!(ts.timestamp()));
2085 }
2086 Value::Object(map)
2087}
2088
2089fn container_to_json(container: &Container) -> Value {
2090 let mut map = serde_json::Map::new();
2091 map.insert("containerArn".into(), json!(container.container_arn));
2092 map.insert("taskArn".into(), json!(container.task_arn));
2093 map.insert("name".into(), json!(container.name));
2094 map.insert("image".into(), json!(container.image));
2095 map.insert("lastStatus".into(), json!(container.last_status));
2096 map.insert("essential".into(), json!(container.essential));
2097 if let Some(code) = container.exit_code {
2098 map.insert("exitCode".into(), json!(code));
2099 }
2100 if let Some(ref r) = container.reason {
2101 map.insert("reason".into(), json!(r));
2102 }
2103 if let Some(ref id) = container.runtime_id {
2104 map.insert("runtimeId".into(), json!(id));
2105 }
2106 if let Some(ref v) = container.cpu {
2107 map.insert("cpu".into(), json!(v));
2108 }
2109 if let Some(ref v) = container.memory {
2110 map.insert("memory".into(), json!(v));
2111 }
2112 if let Some(ref v) = container.memory_reservation {
2113 map.insert("memoryReservation".into(), json!(v));
2114 }
2115 map.insert("networkBindings".into(), json!(container.network_bindings));
2116 map.insert(
2117 "networkInterfaces".into(),
2118 json!(container.network_interfaces),
2119 );
2120 if let Some(ref v) = container.health_status {
2121 map.insert("healthStatus".into(), json!(v));
2122 }
2123 Value::Object(map)
2124}
2125
2126impl EcsService {
2129 fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2130 let body = request.json_body();
2131 let service_name = req_str(&body, "serviceName")?.to_string();
2132 validate_service_name(&service_name)?;
2133 let td_ref = req_str(&body, "taskDefinition")?;
2134 let cluster_ref = opt_str(&body, "cluster");
2135 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2136 let desired_count = body
2137 .get("desiredCount")
2138 .and_then(|v| v.as_i64())
2139 .filter(|n| *n >= 0)
2140 .unwrap_or(1) as i32;
2141 let launch_type = opt_str(&body, "launchType")
2142 .unwrap_or("FARGATE")
2143 .to_string();
2144 let scheduling = opt_str(&body, "schedulingStrategy")
2145 .unwrap_or("REPLICA")
2146 .to_string();
2147 let deployment_controller = body
2148 .get("deploymentController")
2149 .and_then(|v| v.get("type"))
2150 .and_then(|v| v.as_str())
2151 .unwrap_or("ECS")
2152 .to_string();
2153 let deployment_config = body.get("deploymentConfiguration");
2154 let min_healthy = deployment_config
2155 .and_then(|d| d.get("minimumHealthyPercent"))
2156 .and_then(|v| v.as_i64())
2157 .map(|n| n as i32);
2158 let max_percent = deployment_config
2159 .and_then(|d| d.get("maximumPercent"))
2160 .and_then(|v| v.as_i64())
2161 .map(|n| n as i32);
2162 let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2163 let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2164 enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2165 rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2166 });
2167 let tags = parse_tags(&body);
2168 let role_arn = opt_str(&body, "role").map(String::from);
2169 let load_balancers: Vec<Value> = body
2170 .get("loadBalancers")
2171 .and_then(|v| v.as_array())
2172 .cloned()
2173 .unwrap_or_default();
2174 let service_registries: Vec<Value> = body
2175 .get("serviceRegistries")
2176 .and_then(|v| v.as_array())
2177 .cloned()
2178 .unwrap_or_default();
2179 let placement_constraints: Vec<Value> = body
2180 .get("placementConstraints")
2181 .and_then(|v| v.as_array())
2182 .cloned()
2183 .unwrap_or_default();
2184 let placement_strategy: Vec<Value> = body
2185 .get("placementStrategy")
2186 .and_then(|v| v.as_array())
2187 .cloned()
2188 .unwrap_or_default();
2189 let network_configuration = body.get("networkConfiguration").cloned();
2190
2191 let runtime = self.runtime.clone();
2192 let account = request.account_id.clone();
2193 let principal_arn = request
2194 .principal
2195 .as_ref()
2196 .map(|p| p.arn.clone())
2197 .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
2198
2199 let (service_json, spawn_task_ids) = {
2200 let mut accounts = self.state.write();
2201 let state = accounts.get_or_create(&account);
2202 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2204 let revisions = state
2205 .task_definitions
2206 .get(&family)
2207 .ok_or_else(|| task_definition_not_found(td_ref))?;
2208 let td = match rev {
2209 Some(n) => revisions
2210 .get(&n)
2211 .ok_or_else(|| task_definition_not_found(td_ref))?,
2212 None => latest_active_revision(revisions)
2213 .ok_or_else(|| task_definition_not_found(td_ref))?,
2214 };
2215 let td_arn = td.task_definition_arn.clone();
2216 let td_family = td.family.clone();
2217 let td_revision = td.revision;
2218 let cluster_arn = state
2219 .clusters
2220 .get(&cluster_name)
2221 .map(|c| c.cluster_arn.clone())
2222 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2223 let service_arn = state.service_arn(&cluster_name, &service_name);
2224 let key = EcsState::service_key(&cluster_name, &service_name);
2225 if let Some(existing) = state.services.get(&key) {
2226 if existing.status != "INACTIVE" {
2227 return Err(service_already_exists(&service_name));
2228 }
2229 }
2230 let deployment = Deployment {
2231 deployment_id: format!(
2232 "ecs-svc/{}",
2233 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2234 ),
2235 status: "PRIMARY".into(),
2236 task_definition_arn: td_arn.clone(),
2237 desired_count,
2238 pending_count: 0,
2239 running_count: 0,
2240 failed_tasks: 0,
2241 created_at: Utc::now(),
2242 updated_at: Utc::now(),
2243 launch_type: launch_type.clone(),
2244 rollout_state: "IN_PROGRESS".into(),
2245 rollout_state_reason: Some("ECS deployment in progress.".into()),
2246 };
2247 let service = Service {
2248 service_name: service_name.clone(),
2249 service_arn: service_arn.clone(),
2250 cluster_name: cluster_name.clone(),
2251 cluster_arn: cluster_arn.clone(),
2252 task_definition_arn: td_arn,
2253 family: td_family,
2254 revision: td_revision,
2255 desired_count,
2256 running_count: 0,
2257 pending_count: 0,
2258 launch_type: launch_type.clone(),
2259 status: "ACTIVE".into(),
2260 scheduling_strategy: scheduling,
2261 deployment_controller,
2262 minimum_healthy_percent: min_healthy,
2263 maximum_percent: max_percent,
2264 circuit_breaker,
2265 deployments: vec![deployment],
2266 load_balancers,
2267 service_registries,
2268 placement_constraints,
2269 placement_strategy,
2270 network_configuration,
2271 tags: tags.clone(),
2272 created_at: Utc::now(),
2273 created_by: Some(principal_arn.clone()),
2274 role_arn,
2275 };
2276 state.services.insert(key.clone(), service.clone());
2277 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2278 cluster.active_services_count += 1;
2279 }
2280 state.push_event(crate::state::LifecycleEvent {
2281 at: Utc::now(),
2282 event_type: "ServiceCreated".into(),
2283 task_arn: None,
2284 cluster_arn: Some(cluster_arn),
2285 last_status: Some("ACTIVE".into()),
2286 detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2287 });
2288 let ids =
2289 spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2290 (service_to_json(state.services.get(&key).unwrap()), ids)
2291 };
2292
2293 if let Some(rt) = runtime {
2294 for id in spawn_task_ids {
2295 rt.clone().run_task(self.state.clone(), id, account.clone());
2296 }
2297 }
2298
2299 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2300 }
2301
2302 fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2303 let body = request.json_body();
2304 let service_ref = req_str(&body, "service")?;
2305 let service_name = service_name_from_ref(service_ref);
2306 let cluster_ref = opt_str(&body, "cluster");
2307 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2308 let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2309 let new_td_ref = opt_str(&body, "taskDefinition");
2310 let account = request.account_id.clone();
2311 let principal_arn = request
2312 .principal
2313 .as_ref()
2314 .map(|p| p.arn.clone())
2315 .unwrap_or_else(|| Arn::global("iam", &request.account_id, "root").to_string());
2316 let runtime = self.runtime.clone();
2317
2318 let (service_json, spawn_ids, stop_ids) = {
2319 let mut accounts = self.state.write();
2320 let state = accounts
2321 .get_mut(&account)
2322 .ok_or_else(|| service_not_found(&service_name))?;
2323 let key = EcsState::service_key(&cluster_name, &service_name);
2324 if !state.services.contains_key(&key) {
2325 return Err(service_not_found(&service_name));
2326 }
2327
2328 let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2330 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2331 let revisions = state
2332 .task_definitions
2333 .get(&family)
2334 .ok_or_else(|| task_definition_not_found(td_ref))?;
2335 let td = match rev {
2336 Some(n) => revisions
2337 .get(&n)
2338 .ok_or_else(|| task_definition_not_found(td_ref))?,
2339 None => latest_active_revision(revisions)
2340 .ok_or_else(|| task_definition_not_found(td_ref))?,
2341 };
2342 (
2343 Some(td.task_definition_arn.clone()),
2344 td.family.clone(),
2345 td.revision,
2346 )
2347 } else {
2348 let svc = state.services.get(&key).unwrap();
2349 (None, svc.family.clone(), svc.revision)
2350 };
2351
2352 let service_cluster_arn;
2353 let launch_type_clone;
2354 let effective_desired;
2355 let old_desired;
2356 let mut old_deployments_drained: Vec<String> = Vec::new();
2357 let mut new_deployment_triggered = false;
2358
2359 {
2360 let svc = state.services.get_mut(&key).unwrap();
2361 old_desired = svc.desired_count;
2362 service_cluster_arn = svc.cluster_arn.clone();
2363 launch_type_clone = svc.launch_type.clone();
2364
2365 if let Some(n) = new_desired {
2366 let n = n.max(0) as i32;
2367 svc.desired_count = n;
2368 if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2369 d.desired_count = n;
2370 d.updated_at = Utc::now();
2371 }
2372 }
2373
2374 if let Some(arn) = new_td_arn.clone() {
2375 for d in svc.deployments.iter_mut() {
2378 if d.status == "PRIMARY" {
2379 d.status = "ACTIVE".into();
2380 old_deployments_drained.push(d.deployment_id.clone());
2381 }
2382 }
2383 svc.deployments.insert(
2384 0,
2385 Deployment {
2386 deployment_id: format!(
2387 "ecs-svc/{}",
2388 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2389 ),
2390 status: "PRIMARY".into(),
2391 task_definition_arn: arn.clone(),
2392 desired_count: svc.desired_count,
2393 pending_count: 0,
2394 running_count: 0,
2395 failed_tasks: 0,
2396 created_at: Utc::now(),
2397 updated_at: Utc::now(),
2398 launch_type: svc.launch_type.clone(),
2399 rollout_state: "IN_PROGRESS".into(),
2400 rollout_state_reason: Some("ECS deployment in progress.".into()),
2401 },
2402 );
2403 svc.task_definition_arn = arn;
2404 svc.family = new_family;
2405 svc.revision = new_revision;
2406 new_deployment_triggered = true;
2407 }
2408
2409 effective_desired = svc.desired_count;
2410 }
2411
2412 let mut spawn: Vec<String> = Vec::new();
2414 let mut stop: Vec<String> = Vec::new();
2415
2416 let service_tag = format!("ecs-svc/{}", service_name);
2418 let current_tasks: Vec<(String, String)> = state
2419 .tasks
2420 .iter()
2421 .filter(|(_, t)| {
2422 t.started_by.as_deref() == Some(service_tag.as_str())
2423 && t.cluster_name == cluster_name
2424 && t.last_status != "STOPPED"
2425 })
2426 .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2427 .collect();
2428
2429 let current_count = current_tasks.len() as i32;
2430 if effective_desired > current_count {
2431 let add = (effective_desired - current_count) as usize;
2432 let svc_snapshot = state.services.get(&key).unwrap().clone();
2433 let mut new_ids = spawn_service_tasks(
2434 state,
2435 &svc_snapshot,
2436 add as i32,
2437 &principal_arn,
2438 &launch_type_clone,
2439 );
2440 spawn.append(&mut new_ids);
2441 } else if effective_desired < current_count {
2442 let remove = (current_count - effective_desired) as usize;
2443 for (id, _) in current_tasks.iter().take(remove) {
2444 stop.push(id.clone());
2445 }
2446 }
2447
2448 if new_deployment_triggered {
2454 let new_td_arn_match = state
2455 .services
2456 .get(&key)
2457 .unwrap()
2458 .task_definition_arn
2459 .clone();
2460 let kept_on_new_td: i32 = current_tasks
2464 .iter()
2465 .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2466 .count() as i32;
2467 for (id, t_arn) in ¤t_tasks {
2468 if *t_arn != new_td_arn_match && !stop.contains(id) {
2469 stop.push(id.clone());
2470 }
2471 }
2472 let already_spawned = spawn.len() as i32;
2473 let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2474 if need > 0 {
2475 let svc_snapshot = state.services.get(&key).unwrap().clone();
2476 let mut more = spawn_service_tasks(
2477 state,
2478 &svc_snapshot,
2479 need,
2480 &principal_arn,
2481 &launch_type_clone,
2482 );
2483 spawn.append(&mut more);
2484 }
2485 }
2486
2487 state.push_event(crate::state::LifecycleEvent {
2488 at: Utc::now(),
2489 event_type: "ServiceUpdated".into(),
2490 task_arn: None,
2491 cluster_arn: Some(service_cluster_arn),
2492 last_status: Some("ACTIVE".into()),
2493 detail: json!({
2494 "serviceArn": state.services.get(&key).unwrap().service_arn,
2495 "desiredCount": effective_desired,
2496 "previousDesiredCount": old_desired,
2497 "newDeployment": new_deployment_triggered,
2498 "drainedDeployments": old_deployments_drained,
2499 }),
2500 });
2501
2502 let svc = state.services.get(&key).unwrap();
2503 (service_to_json(svc), spawn, stop)
2504 };
2505
2506 if let Some(rt) = runtime {
2507 for id in spawn_ids {
2508 rt.clone().run_task(self.state.clone(), id, account.clone());
2509 }
2510 for id in stop_ids {
2511 let rt2 = rt.clone();
2512 let id_clone = id.clone();
2513 tokio::spawn(async move {
2514 rt2.stop_task(&id_clone, "ECS service scale-down").await;
2515 });
2516 let mut accounts = self.state.write();
2518 if let Some(state) = accounts.get_mut(&account) {
2519 if let Some(task) = state.tasks.get_mut(&id) {
2520 task.desired_status = "STOPPED".into();
2521 task.stopping_at = Some(Utc::now());
2522 }
2523 }
2524 }
2525 }
2526
2527 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2528 }
2529
2530 async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2531 let body = request.json_body();
2532 let service_ref = req_str(&body, "service")?;
2533 let service_name = service_name_from_ref(service_ref);
2534 let cluster_ref = opt_str(&body, "cluster");
2535 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2536 let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2537
2538 let (snapshot, task_ids_to_stop) = {
2539 let mut accounts = self.state.write();
2540 let state = accounts
2541 .get_mut(&request.account_id)
2542 .ok_or_else(|| service_not_found(&service_name))?;
2543 let key = EcsState::service_key(&cluster_name, &service_name);
2544 let svc = state
2545 .services
2546 .get_mut(&key)
2547 .ok_or_else(|| service_not_found(&service_name))?;
2548 if !force && svc.desired_count > 0 {
2549 return Err(client_exception(
2550 "The service cannot be stopped while it is scaled above 0. \
2551 Either set desiredCount to 0 first, or pass force=true.",
2552 ));
2553 }
2554 svc.desired_count = 0;
2555 svc.status = "DRAINING".into();
2556 let service_tag = format!("ecs-svc/{}", service_name);
2557 let stop_ids: Vec<String> = state
2558 .tasks
2559 .iter()
2560 .filter(|(_, t)| {
2561 t.started_by.as_deref() == Some(service_tag.as_str())
2562 && t.cluster_name == cluster_name
2563 && t.last_status != "STOPPED"
2564 })
2565 .map(|(id, _)| id.clone())
2566 .collect();
2567 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2568 if cluster.active_services_count > 0 {
2569 cluster.active_services_count -= 1;
2570 }
2571 }
2572 let svc_snapshot = state.services.get(&key).unwrap().clone();
2573 state.services.remove(&key);
2574 state.push_event(crate::state::LifecycleEvent {
2575 at: Utc::now(),
2576 event_type: "ServiceDeleted".into(),
2577 task_arn: None,
2578 cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2579 last_status: Some("DRAINING".into()),
2580 detail: json!({"serviceArn": svc_snapshot.service_arn}),
2581 });
2582 (svc_snapshot, stop_ids)
2583 };
2584
2585 if let Some(rt) = &self.runtime {
2586 for id in &task_ids_to_stop {
2587 rt.stop_task(id, "ECS service deletion").await;
2588 let mut accounts = self.state.write();
2589 if let Some(state) = accounts.get_mut(&request.account_id) {
2590 if let Some(task) = state.tasks.get_mut(id) {
2591 task.desired_status = "STOPPED".into();
2592 task.stopping_at = Some(Utc::now());
2593 }
2594 }
2595 }
2596 }
2597
2598 Ok(AwsResponse::ok_json(
2599 json!({ "service": service_to_json(&snapshot) }),
2600 ))
2601 }
2602
2603 fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2604 let body = request.json_body();
2605 let cluster_ref = opt_str(&body, "cluster");
2606 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2607 let refs: Vec<String> = body
2608 .get("services")
2609 .and_then(|v| v.as_array())
2610 .map(|arr| {
2611 arr.iter()
2612 .filter_map(|v| v.as_str().map(String::from))
2613 .collect()
2614 })
2615 .unwrap_or_default();
2616
2617 let account = request.account_id.clone();
2618 let accounts = self.state.read();
2619 let mut found = Vec::new();
2620 let mut failures = Vec::new();
2621 let Some(state) = accounts.get(&account) else {
2622 for r in &refs {
2623 failures.push(json!({"arn": r, "reason": "MISSING"}));
2624 }
2625 return Ok(AwsResponse::ok_json(
2626 json!({"services": found, "failures": failures}),
2627 ));
2628 };
2629 for r in &refs {
2630 let name = service_name_from_ref(r);
2631 let key = EcsState::service_key(&cluster_name, &name);
2632 match state.services.get(&key) {
2633 Some(svc) => {
2634 let mut v = service_to_json(svc);
2635 recompute_service_counts(state, &name, &cluster_name, &mut v);
2637 found.push(v);
2638 }
2639 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2640 }
2641 }
2642 Ok(AwsResponse::ok_json(json!({
2643 "services": found,
2644 "failures": failures,
2645 })))
2646 }
2647
2648 fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2649 let body = request.json_body();
2650 let cluster_ref = opt_str(&body, "cluster");
2651 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2652 let launch_type = opt_str(&body, "launchType");
2653 let scheduling = opt_str(&body, "schedulingStrategy");
2654 let max_results = body
2655 .get("maxResults")
2656 .and_then(|v| v.as_i64())
2657 .filter(|n| (1..=100).contains(n))
2658 .map(|n| n as usize)
2659 .unwrap_or(100);
2660 let next_token = opt_str(&body, "nextToken").unwrap_or("");
2661
2662 let account = request.account_id.clone();
2663 let accounts = self.state.read();
2664 let mut arns: Vec<String> = match accounts.get(&account) {
2665 Some(state) => state
2666 .services
2667 .values()
2668 .filter(|s| s.cluster_name == cluster_name)
2669 .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2670 .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2671 .map(|s| s.service_arn.clone())
2672 .collect(),
2673 None => Vec::new(),
2674 };
2675 arns.sort();
2676 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2677 let end = (start + max_results).min(arns.len());
2678 let page = arns[start..end].to_vec();
2679 let mut out = json!({"serviceArns": page});
2680 if end < arns.len() {
2681 out.as_object_mut()
2682 .unwrap()
2683 .insert("nextToken".into(), json!(end.to_string()));
2684 }
2685 Ok(AwsResponse::ok_json(out))
2686 }
2687
2688 fn list_services_by_namespace(
2689 &self,
2690 request: &AwsRequest,
2691 ) -> Result<AwsResponse, AwsServiceError> {
2692 let body = request.json_body();
2698 let namespace = req_str(&body, "namespace")?.to_string();
2699 let account = request.account_id.clone();
2700 let accounts = self.state.read();
2701 let mut arns: Vec<String> = match accounts.get(&account) {
2702 Some(state) => state
2703 .services
2704 .values()
2705 .filter(|s| {
2706 s.service_registries.iter().any(|r| {
2707 r.get("registryArn")
2708 .and_then(|v| v.as_str())
2709 .is_some_and(|arn| arn.contains(&namespace))
2710 })
2711 })
2712 .map(|s| s.service_arn.clone())
2713 .collect(),
2714 None => Vec::new(),
2715 };
2716 arns.sort();
2717 Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2718 }
2719}
2720
2721fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2722 if name.is_empty() || name.len() > 255 {
2723 return Err(invalid_parameter("Service name must be 1-255 characters"));
2724 }
2725 let ok = name
2726 .chars()
2727 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2728 if !ok {
2729 return Err(invalid_parameter(
2730 "Service name may only contain letters, numbers, hyphens, and underscores",
2731 ));
2732 }
2733 Ok(())
2734}
2735
2736fn service_name_from_ref(input: &str) -> String {
2737 if let Some(rest) = input.rsplit('/').next() {
2738 return rest.to_string();
2739 }
2740 input.to_string()
2741}
2742
2743fn service_not_found(name: &str) -> AwsServiceError {
2744 AwsServiceError::aws_error(
2745 StatusCode::BAD_REQUEST,
2746 "ServiceNotFoundException",
2747 format!("The service could not be found: {name}"),
2748 )
2749}
2750
2751fn service_already_exists(name: &str) -> AwsServiceError {
2752 AwsServiceError::aws_error(
2753 StatusCode::BAD_REQUEST,
2754 "ServiceNotActiveException",
2755 format!("The service {name} already exists"),
2756 )
2757}
2758
2759fn spawn_service_tasks(
2764 state: &mut EcsState,
2765 service: &Service,
2766 count: i32,
2767 principal_arn: &str,
2768 launch_type: &str,
2769) -> Vec<String> {
2770 if count <= 0 {
2771 return Vec::new();
2772 }
2773 let Some(revisions) = state.task_definitions.get(&service.family) else {
2774 return Vec::new();
2775 };
2776 let Some(td) = revisions.get(&service.revision) else {
2777 return Vec::new();
2778 };
2779 let container_defs = td.container_definitions.clone();
2780 let cpu = td.cpu.clone();
2781 let memory = td.memory.clone();
2782 let task_role = td.task_role_arn.clone();
2783 let exec_role = td.execution_role_arn.clone();
2784 let cluster_name = service.cluster_name.clone();
2785 let cluster_arn = service.cluster_arn.clone();
2786 let td_arn = service.task_definition_arn.clone();
2787 let family = service.family.clone();
2788 let revision = service.revision;
2789 let service_tag = format!("ecs-svc/{}", service.service_name);
2790
2791 let mut ids = Vec::with_capacity(count as usize);
2792 for _ in 0..count {
2793 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2794 let task_arn = state.task_arn(&cluster_name, &task_id);
2795 let containers: Vec<Container> = container_defs
2796 .iter()
2797 .map(|def| Container {
2798 container_arn: format!(
2799 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2800 state.region,
2801 state.account_id,
2802 cluster_name,
2803 task_id,
2804 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2805 ),
2806 name: def
2807 .get("name")
2808 .and_then(|v| v.as_str())
2809 .unwrap_or("app")
2810 .to_string(),
2811 image: def
2812 .get("image")
2813 .and_then(|v| v.as_str())
2814 .unwrap_or("")
2815 .to_string(),
2816 task_arn: task_arn.clone(),
2817 last_status: "PENDING".into(),
2818 exit_code: None,
2819 reason: None,
2820 runtime_id: None,
2821 essential: def
2822 .get("essential")
2823 .and_then(|v| v.as_bool())
2824 .unwrap_or(true),
2825 cpu: def
2826 .get("cpu")
2827 .and_then(|v| v.as_i64())
2828 .map(|n| n.to_string()),
2829 memory: def
2830 .get("memory")
2831 .and_then(|v| v.as_i64())
2832 .map(|n| n.to_string()),
2833 memory_reservation: def
2834 .get("memoryReservation")
2835 .and_then(|v| v.as_i64())
2836 .map(|n| n.to_string()),
2837 network_bindings: Vec::new(),
2838 network_interfaces: Vec::new(),
2839 health_status: Some("UNKNOWN".into()),
2840 managed_agents: None,
2841 })
2842 .collect();
2843 let awslogs = container_defs.iter().find_map(|def| {
2844 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2845 let log_cfg = def.get("logConfiguration")?;
2846 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2847 return None;
2848 }
2849 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2850 Some(AwsLogsConfig {
2851 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2852 stream_prefix: opts
2853 .get("awslogs-stream-prefix")
2854 .and_then(|v| v.as_str())
2855 .map(String::from),
2856 region: opts
2857 .get("awslogs-region")
2858 .and_then(|v| v.as_str())
2859 .unwrap_or(&state.region)
2860 .to_string(),
2861 container_name: name,
2862 })
2863 });
2864 let task = Task {
2865 task_arn: task_arn.clone(),
2866 task_id: task_id.clone(),
2867 cluster_arn: cluster_arn.clone(),
2868 cluster_name: cluster_name.clone(),
2869 task_definition_arn: td_arn.clone(),
2870 family: family.clone(),
2871 revision,
2872 last_status: "PENDING".into(),
2873 desired_status: "RUNNING".into(),
2874 launch_type: launch_type.into(),
2875 platform_version: Some("1.4.0".into()),
2876 cpu: cpu.clone(),
2877 memory: memory.clone(),
2878 containers,
2879 overrides: json!({}),
2880 started_by: Some(service_tag.clone()),
2881 group: Some(format!("service:{}", service.service_name)),
2882 connectivity: "CONNECTING".into(),
2883 stop_code: None,
2884 stopped_reason: None,
2885 created_at: Utc::now(),
2886 started_at: None,
2887 stopping_at: None,
2888 stopped_at: None,
2889 pull_started_at: None,
2890 pull_stopped_at: None,
2891 connectivity_at: None,
2892 started_by_ref_id: None,
2893 execution_role_arn: exec_role.clone(),
2894 task_role_arn: task_role.clone(),
2895 tags: Vec::new(),
2896 awslogs,
2897 captured_logs: String::new(),
2898 protection: None,
2899 };
2900 state.tasks.insert(task_id.clone(), task);
2901 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2902 cluster.pending_tasks_count += 1;
2903 }
2904 ids.push(task_id);
2905 }
2906 let _ = principal_arn;
2907 ids
2908}
2909
2910fn recompute_service_counts(
2911 state: &EcsState,
2912 service_name: &str,
2913 cluster_name: &str,
2914 service_json: &mut Value,
2915) {
2916 let service_tag = format!("ecs-svc/{}", service_name);
2917 let mut running = 0i32;
2918 let mut pending = 0i32;
2919 for t in state.tasks.values() {
2920 if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2921 match t.last_status.as_str() {
2922 "RUNNING" => running += 1,
2923 "PENDING" | "PROVISIONING" => pending += 1,
2924 _ => {}
2925 }
2926 }
2927 }
2928 if let Some(map) = service_json.as_object_mut() {
2929 map.insert("runningCount".into(), json!(running));
2930 map.insert("pendingCount".into(), json!(pending));
2931 }
2932}
2933
2934fn service_to_json(svc: &Service) -> Value {
2935 let mut map = serde_json::Map::new();
2936 map.insert("serviceArn".into(), json!(svc.service_arn));
2937 map.insert("serviceName".into(), json!(svc.service_name));
2938 map.insert("clusterArn".into(), json!(svc.cluster_arn));
2939 map.insert("status".into(), json!(svc.status));
2940 map.insert("desiredCount".into(), json!(svc.desired_count));
2941 map.insert("runningCount".into(), json!(svc.running_count));
2942 map.insert("pendingCount".into(), json!(svc.pending_count));
2943 map.insert("launchType".into(), json!(svc.launch_type));
2944 map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2945 map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2946 map.insert(
2947 "deploymentController".into(),
2948 json!({"type": svc.deployment_controller}),
2949 );
2950 let mut deployment_cfg = serde_json::Map::new();
2951 if let Some(n) = svc.minimum_healthy_percent {
2952 deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2953 }
2954 if let Some(n) = svc.maximum_percent {
2955 deployment_cfg.insert("maximumPercent".into(), json!(n));
2956 }
2957 if let Some(ref cb) = svc.circuit_breaker {
2958 deployment_cfg.insert(
2959 "deploymentCircuitBreaker".into(),
2960 json!({"enable": cb.enable, "rollback": cb.rollback}),
2961 );
2962 }
2963 if !deployment_cfg.is_empty() {
2964 map.insert(
2965 "deploymentConfiguration".into(),
2966 Value::Object(deployment_cfg),
2967 );
2968 }
2969 map.insert(
2970 "deployments".into(),
2971 Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2972 );
2973 map.insert(
2974 "loadBalancers".into(),
2975 Value::Array(svc.load_balancers.clone()),
2976 );
2977 map.insert(
2978 "serviceRegistries".into(),
2979 Value::Array(svc.service_registries.clone()),
2980 );
2981 map.insert(
2982 "placementConstraints".into(),
2983 Value::Array(svc.placement_constraints.clone()),
2984 );
2985 map.insert(
2986 "placementStrategy".into(),
2987 Value::Array(svc.placement_strategy.clone()),
2988 );
2989 if let Some(ref v) = svc.network_configuration {
2990 map.insert("networkConfiguration".into(), v.clone());
2991 }
2992 if let Some(ref v) = svc.role_arn {
2993 map.insert("roleArn".into(), json!(v));
2994 }
2995 if let Some(ref v) = svc.created_by {
2996 map.insert("createdBy".into(), json!(v));
2997 }
2998 map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
2999 Value::Object(map)
3000}
3001
3002fn deployment_to_json(d: &Deployment) -> Value {
3003 json!({
3004 "id": d.deployment_id,
3005 "status": d.status,
3006 "taskDefinition": d.task_definition_arn,
3007 "desiredCount": d.desired_count,
3008 "pendingCount": d.pending_count,
3009 "runningCount": d.running_count,
3010 "failedTasks": d.failed_tasks,
3011 "createdAt": d.created_at.timestamp(),
3012 "updatedAt": d.updated_at.timestamp(),
3013 "launchType": d.launch_type,
3014 "rolloutState": d.rollout_state,
3015 "rolloutStateReason": d.rollout_state_reason,
3016 })
3017}
3018
3019impl EcsService {
3022 fn register_container_instance(
3023 &self,
3024 request: &AwsRequest,
3025 ) -> Result<AwsResponse, AwsServiceError> {
3026 let body = request.json_body();
3027 let cluster_ref = opt_str(&body, "cluster");
3028 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3029 let ec2_id = opt_str(&body, "instanceIdentityDocument")
3030 .and_then(|s| serde_json::from_str::<Value>(s).ok())
3031 .and_then(|v| {
3032 v.get("instanceId")
3033 .and_then(|x| x.as_str())
3034 .map(String::from)
3035 });
3036 let tags = parse_tags(&body);
3037
3038 let account = request.account_id.clone();
3039 let mut accounts = self.state.write();
3040 let state = accounts.get_or_create(&account);
3041 let cluster_arn = state
3042 .clusters
3043 .get(&cluster_name)
3044 .map(|c| c.cluster_arn.clone())
3045 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
3046 let uuid = uuid::Uuid::new_v4().to_string();
3047 let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
3048 let key = format!("{}/{}", cluster_name, uuid);
3049 let ci = ContainerInstance {
3050 container_instance_arn: ci_arn.clone(),
3051 ec2_instance_id: ec2_id,
3052 cluster_name: cluster_name.clone(),
3053 cluster_arn,
3054 status: "ACTIVE".into(),
3055 version: 1,
3056 version_info: body.get("versionInfo").cloned(),
3057 agent_connected: true,
3058 agent_update_status: None,
3059 remaining_resources: body
3060 .get("totalResources")
3061 .and_then(|v| v.as_array())
3062 .cloned()
3063 .unwrap_or_default(),
3064 registered_resources: body
3065 .get("totalResources")
3066 .and_then(|v| v.as_array())
3067 .cloned()
3068 .unwrap_or_default(),
3069 running_tasks_count: 0,
3070 pending_tasks_count: 0,
3071 registered_at: Utc::now(),
3072 attributes: body
3073 .get("attributes")
3074 .and_then(|v| v.as_array())
3075 .map(|arr| {
3076 arr.iter()
3077 .filter_map(|a| {
3078 let name = a.get("name").and_then(|v| v.as_str())?;
3079 Some(AttributeRef {
3080 name: name.to_string(),
3081 value: a.get("value").and_then(|v| v.as_str()).map(String::from),
3082 target_type: a
3083 .get("targetType")
3084 .and_then(|v| v.as_str())
3085 .map(String::from),
3086 target_id: a
3087 .get("targetId")
3088 .and_then(|v| v.as_str())
3089 .map(String::from),
3090 })
3091 })
3092 .collect()
3093 })
3094 .unwrap_or_default(),
3095 tags,
3096 capacity_provider_name: None,
3097 health_status: None,
3098 };
3099 state.container_instances.insert(key, ci.clone());
3100 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3101 cluster.registered_container_instances_count += 1;
3102 }
3103 Ok(AwsResponse::ok_json(json!({
3104 "containerInstance": container_instance_to_json(&ci),
3105 })))
3106 }
3107
3108 fn deregister_container_instance(
3109 &self,
3110 request: &AwsRequest,
3111 ) -> Result<AwsResponse, AwsServiceError> {
3112 let body = request.json_body();
3113 let ci_ref = req_str(&body, "containerInstance")?.to_string();
3114 let cluster_ref = opt_str(&body, "cluster");
3115 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3116 let id = container_instance_id_from_ref(&ci_ref);
3117 let key = format!("{}/{}", cluster_name, id);
3118
3119 let account = request.account_id.clone();
3120 let mut accounts = self.state.write();
3121 let state = accounts
3122 .get_mut(&account)
3123 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3124 let mut ci = state
3125 .container_instances
3126 .remove(&key)
3127 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3128 ci.status = "INACTIVE".into();
3129 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3130 if cluster.registered_container_instances_count > 0 {
3131 cluster.registered_container_instances_count -= 1;
3132 }
3133 }
3134 Ok(AwsResponse::ok_json(json!({
3135 "containerInstance": container_instance_to_json(&ci),
3136 })))
3137 }
3138
3139 fn describe_container_instances(
3140 &self,
3141 request: &AwsRequest,
3142 ) -> Result<AwsResponse, AwsServiceError> {
3143 let body = request.json_body();
3144 let cluster_ref = opt_str(&body, "cluster");
3145 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3146 let refs: Vec<String> = body
3147 .get("containerInstances")
3148 .and_then(|v| v.as_array())
3149 .map(|arr| {
3150 arr.iter()
3151 .filter_map(|v| v.as_str().map(String::from))
3152 .collect()
3153 })
3154 .unwrap_or_default();
3155
3156 let accounts = self.state.read();
3157 let mut found = Vec::new();
3158 let mut failures = Vec::new();
3159 if let Some(state) = accounts.get(&request.account_id) {
3160 for r in &refs {
3161 let id = container_instance_id_from_ref(r);
3162 let key = format!("{}/{}", cluster_name, id);
3163 match state.container_instances.get(&key) {
3164 Some(ci) => found.push(container_instance_to_json(ci)),
3165 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3166 }
3167 }
3168 } else {
3169 for r in &refs {
3170 failures.push(json!({"arn": r, "reason": "MISSING"}));
3171 }
3172 }
3173 Ok(AwsResponse::ok_json(json!({
3174 "containerInstances": found,
3175 "failures": failures,
3176 })))
3177 }
3178
3179 fn list_container_instances(
3180 &self,
3181 request: &AwsRequest,
3182 ) -> Result<AwsResponse, AwsServiceError> {
3183 let body = request.json_body();
3184 let cluster_ref = opt_str(&body, "cluster");
3185 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3186 let status_filter = opt_str(&body, "status");
3187 let max_results = body
3188 .get("maxResults")
3189 .and_then(|v| v.as_i64())
3190 .filter(|n| (1..=100).contains(n))
3191 .map(|n| n as usize)
3192 .unwrap_or(100);
3193 let next_token = opt_str(&body, "nextToken").unwrap_or("");
3194
3195 let accounts = self.state.read();
3196 let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3197 Some(state) => state
3198 .container_instances
3199 .values()
3200 .filter(|ci| ci.cluster_name == cluster_name)
3201 .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3202 .map(|ci| ci.container_instance_arn.clone())
3203 .collect(),
3204 None => Vec::new(),
3205 };
3206 arns.sort();
3207 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3208 let end = (start + max_results).min(arns.len());
3209 let page = arns[start..end].to_vec();
3210 let mut out = json!({"containerInstanceArns": page});
3211 if end < arns.len() {
3212 out.as_object_mut()
3213 .unwrap()
3214 .insert("nextToken".into(), json!(end.to_string()));
3215 }
3216 Ok(AwsResponse::ok_json(out))
3217 }
3218
3219 fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3220 let body = request.json_body();
3221 let ci_ref = req_str(&body, "containerInstance")?.to_string();
3222 let cluster_ref = opt_str(&body, "cluster");
3223 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3224 let id = container_instance_id_from_ref(&ci_ref);
3225 let key = format!("{}/{}", cluster_name, id);
3226 let mut accounts = self.state.write();
3227 let state = accounts
3228 .get_mut(&request.account_id)
3229 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3230 let ci = state
3231 .container_instances
3232 .get_mut(&key)
3233 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3234 ci.agent_update_status = Some("UPDATED".into());
3235 Ok(AwsResponse::ok_json(json!({
3236 "containerInstance": container_instance_to_json(ci),
3237 })))
3238 }
3239
3240 fn update_container_instances_state(
3241 &self,
3242 request: &AwsRequest,
3243 ) -> Result<AwsResponse, AwsServiceError> {
3244 let body = request.json_body();
3245 let status = req_str(&body, "status")?.to_string();
3246 let cluster_ref = opt_str(&body, "cluster");
3247 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3248 let refs: Vec<String> = body
3249 .get("containerInstances")
3250 .and_then(|v| v.as_array())
3251 .map(|arr| {
3252 arr.iter()
3253 .filter_map(|v| v.as_str().map(String::from))
3254 .collect()
3255 })
3256 .unwrap_or_default();
3257 let mut accounts = self.state.write();
3258 let state = accounts
3259 .get_mut(&request.account_id)
3260 .ok_or_else(|| client_exception("account not found"))?;
3261 let mut found = Vec::new();
3262 let mut failures = Vec::new();
3263 for r in &refs {
3264 let id = container_instance_id_from_ref(r);
3265 let key = format!("{}/{}", cluster_name, id);
3266 match state.container_instances.get_mut(&key) {
3267 Some(ci) => {
3268 ci.status = status.clone();
3269 found.push(container_instance_to_json(ci));
3270 }
3271 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3272 }
3273 }
3274 Ok(AwsResponse::ok_json(json!({
3275 "containerInstances": found,
3276 "failures": failures,
3277 })))
3278 }
3279
3280 fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3281 let body = request.json_body();
3282 let cluster_ref = opt_str(&body, "cluster");
3283 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3284 let attrs = body
3285 .get("attributes")
3286 .and_then(|v| v.as_array())
3287 .cloned()
3288 .unwrap_or_default();
3289
3290 let mut accounts = self.state.write();
3291 let state = accounts.get_or_create(&request.account_id);
3292 let mut stored = Vec::new();
3293 for a in &attrs {
3294 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3295 continue;
3296 };
3297 let target_type = a
3298 .get("targetType")
3299 .and_then(|v| v.as_str())
3300 .unwrap_or("container-instance")
3301 .to_string();
3302 let target_id = a
3303 .get("targetId")
3304 .and_then(|v| v.as_str())
3305 .unwrap_or("")
3306 .to_string();
3307 let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3308 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3309 let attr = Attribute {
3310 cluster_name: cluster_name.clone(),
3311 target_type: target_type.clone(),
3312 target_id: target_id.clone(),
3313 name: name.to_string(),
3314 value: value.clone(),
3315 };
3316 state.attributes.insert(key, attr);
3317 stored.push(json!({
3318 "name": name,
3319 "value": value,
3320 "targetType": target_type,
3321 "targetId": target_id,
3322 }));
3323 }
3324 Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3325 }
3326
3327 fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3328 let body = request.json_body();
3329 let cluster_ref = opt_str(&body, "cluster");
3330 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3331 let attrs = body
3332 .get("attributes")
3333 .and_then(|v| v.as_array())
3334 .cloned()
3335 .unwrap_or_default();
3336 let mut accounts = self.state.write();
3337 let state = accounts.get_or_create(&request.account_id);
3338 let mut deleted = Vec::new();
3339 for a in &attrs {
3340 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3341 continue;
3342 };
3343 let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3344 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3345 if let Some(attr) = state.attributes.remove(&key) {
3346 deleted.push(json!({
3347 "name": attr.name,
3348 "value": attr.value,
3349 "targetType": attr.target_type,
3350 "targetId": attr.target_id,
3351 }));
3352 }
3353 }
3354 Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3355 }
3356
3357 fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3358 let body = request.json_body();
3359 let cluster_ref = opt_str(&body, "cluster");
3360 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3361 let target_type = req_str(&body, "targetType")?.to_string();
3362 let attr_name = opt_str(&body, "attributeName");
3363 let attr_value = opt_str(&body, "attributeValue");
3364
3365 let accounts = self.state.read();
3366 let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3367 Some(state) => state
3368 .attributes
3369 .values()
3370 .filter(|a| a.cluster_name == cluster_name)
3371 .filter(|a| a.target_type == target_type)
3372 .filter(|a| attr_name.is_none_or(|n| a.name == n))
3373 .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3374 .map(|a| {
3375 json!({
3376 "name": a.name,
3377 "value": a.value,
3378 "targetType": a.target_type,
3379 "targetId": a.target_id,
3380 })
3381 })
3382 .collect(),
3383 None => Vec::new(),
3384 };
3385 Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3386 }
3387
3388 fn create_capacity_provider(
3389 &self,
3390 request: &AwsRequest,
3391 ) -> Result<AwsResponse, AwsServiceError> {
3392 let body = request.json_body();
3393 let name = req_str(&body, "name")?.to_string();
3394 if name.starts_with("aws") || name.starts_with("ecs") {
3395 return Err(invalid_parameter(format!(
3396 "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3397 )));
3398 }
3399 let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3400 let tags = parse_tags(&body);
3401
3402 let mut accounts = self.state.write();
3403 let state = accounts.get_or_create(&request.account_id);
3404 if state.capacity_providers.contains_key(&name) {
3405 return Err(client_exception(format!(
3406 "Capacity provider already exists: {name}"
3407 )));
3408 }
3409 let arn = format!(
3410 "arn:aws:ecs:{}:{}:capacity-provider/{}",
3411 state.region, state.account_id, name
3412 );
3413 let cp = CapacityProvider {
3414 name: name.clone(),
3415 arn,
3416 status: "ACTIVE".into(),
3417 auto_scaling_group_provider,
3418 update_status: None,
3419 update_status_reason: None,
3420 created_at: Utc::now(),
3421 tags,
3422 };
3423 state.capacity_providers.insert(name.clone(), cp.clone());
3424 Ok(AwsResponse::ok_json(json!({
3425 "capacityProvider": capacity_provider_to_json(&cp),
3426 })))
3427 }
3428
3429 fn delete_capacity_provider(
3430 &self,
3431 request: &AwsRequest,
3432 ) -> Result<AwsResponse, AwsServiceError> {
3433 let body = request.json_body();
3434 let input = req_str(&body, "capacityProvider")?.to_string();
3435 let name = capacity_provider_name_from_ref(&input);
3436 let mut accounts = self.state.write();
3437 let state = accounts
3438 .get_mut(&request.account_id)
3439 .ok_or_else(|| capacity_provider_not_found(&name))?;
3440 let mut cp = state
3441 .capacity_providers
3442 .remove(&name)
3443 .ok_or_else(|| capacity_provider_not_found(&name))?;
3444 cp.status = "INACTIVE".into();
3445 Ok(AwsResponse::ok_json(json!({
3446 "capacityProvider": capacity_provider_to_json(&cp),
3447 })))
3448 }
3449
3450 fn describe_capacity_providers(
3451 &self,
3452 request: &AwsRequest,
3453 ) -> Result<AwsResponse, AwsServiceError> {
3454 let body = request.json_body();
3455 let names: Vec<String> = body
3456 .get("capacityProviders")
3457 .and_then(|v| v.as_array())
3458 .map(|arr| {
3459 arr.iter()
3460 .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3461 .collect()
3462 })
3463 .unwrap_or_default();
3464 let accounts = self.state.read();
3465 let mut found = Vec::new();
3466 let mut failures = Vec::new();
3467 if let Some(state) = accounts.get(&request.account_id) {
3468 if names.is_empty() {
3469 for cp in state.capacity_providers.values() {
3470 found.push(capacity_provider_to_json(cp));
3471 }
3472 } else {
3473 for n in &names {
3474 match state.capacity_providers.get(n) {
3475 Some(cp) => found.push(capacity_provider_to_json(cp)),
3476 None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3477 }
3478 }
3479 }
3480 }
3481 Ok(AwsResponse::ok_json(json!({
3482 "capacityProviders": found,
3483 "failures": failures,
3484 })))
3485 }
3486
3487 fn update_capacity_provider(
3488 &self,
3489 request: &AwsRequest,
3490 ) -> Result<AwsResponse, AwsServiceError> {
3491 let body = request.json_body();
3492 let input = req_str(&body, "name")?.to_string();
3493 let name = capacity_provider_name_from_ref(&input);
3494 let asg = body.get("autoScalingGroupProvider").cloned();
3495 let mut accounts = self.state.write();
3496 let state = accounts
3497 .get_mut(&request.account_id)
3498 .ok_or_else(|| capacity_provider_not_found(&name))?;
3499 let cp = state
3500 .capacity_providers
3501 .get_mut(&name)
3502 .ok_or_else(|| capacity_provider_not_found(&name))?;
3503 if let Some(v) = asg {
3504 cp.auto_scaling_group_provider = Some(v);
3505 }
3506 cp.update_status = Some("UPDATE_COMPLETE".into());
3507 Ok(AwsResponse::ok_json(json!({
3508 "capacityProvider": capacity_provider_to_json(cp),
3509 })))
3510 }
3511
3512 fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3513 let body = request.json_body();
3514 let refs: Vec<String> = body
3515 .get("tasks")
3516 .and_then(|v| v.as_array())
3517 .map(|arr| {
3518 arr.iter()
3519 .filter_map(|v| v.as_str().map(String::from))
3520 .collect()
3521 })
3522 .unwrap_or_default();
3523 let accounts = self.state.read();
3524 let mut protections = Vec::new();
3525 let mut failures = Vec::new();
3526 if let Some(state) = accounts.get(&request.account_id) {
3527 for r in &refs {
3528 let id = task_id_from_ref(r);
3529 match state.tasks.get(&id) {
3530 Some(t) => protections.push(task_protection_json(t)),
3531 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3532 }
3533 }
3534 }
3535 Ok(AwsResponse::ok_json(json!({
3536 "protectedTasks": protections,
3537 "failures": failures,
3538 })))
3539 }
3540
3541 fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3542 let body = request.json_body();
3543 let refs: Vec<String> = body
3544 .get("tasks")
3545 .and_then(|v| v.as_array())
3546 .map(|arr| {
3547 arr.iter()
3548 .filter_map(|v| v.as_str().map(String::from))
3549 .collect()
3550 })
3551 .unwrap_or_default();
3552 let protect = body
3553 .get("protectionEnabled")
3554 .and_then(|v| v.as_bool())
3555 .unwrap_or(false);
3556 let expires_in_minutes = body
3557 .get("expiresInMinutes")
3558 .and_then(|v| v.as_i64())
3559 .unwrap_or(2880);
3560 let expiration = if protect {
3561 Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3562 } else {
3563 None
3564 };
3565
3566 let mut accounts = self.state.write();
3567 let state = accounts
3568 .get_mut(&request.account_id)
3569 .ok_or_else(|| client_exception("account not found"))?;
3570 let mut protections = Vec::new();
3571 let mut failures = Vec::new();
3572 for r in &refs {
3573 let id = task_id_from_ref(r);
3574 match state.tasks.get_mut(&id) {
3575 Some(t) => {
3576 t.protection = Some(crate::state::TaskProtection {
3577 enabled: protect,
3578 expiration,
3579 });
3580 protections.push(task_protection_json(t));
3581 }
3582 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3583 }
3584 }
3585 Ok(AwsResponse::ok_json(json!({
3586 "protectedTasks": protections,
3587 "failures": failures,
3588 })))
3589 }
3590
3591 fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3592 let body = request.json_body();
3593 let service_ref = req_str(&body, "service")?;
3594 let service_name = service_name_from_ref(service_ref);
3595 let cluster_ref = opt_str(&body, "cluster");
3596 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3597 let task_definition = req_str(&body, "taskDefinition")?.to_string();
3598 let external_id = opt_str(&body, "externalId").map(String::from);
3599 let launch_type = opt_str(&body, "launchType").map(String::from);
3600 let platform_version = opt_str(&body, "platformVersion").map(String::from);
3601 let scale = body.get("scale").cloned();
3602 let tags = parse_tags(&body);
3603 let load_balancers = body
3604 .get("loadBalancers")
3605 .and_then(|v| v.as_array())
3606 .cloned()
3607 .unwrap_or_default();
3608 let service_registries = body
3609 .get("serviceRegistries")
3610 .and_then(|v| v.as_array())
3611 .cloned()
3612 .unwrap_or_default();
3613 let capacity_provider_strategy = body
3614 .get("capacityProviderStrategy")
3615 .and_then(|v| v.as_array())
3616 .cloned()
3617 .unwrap_or_default();
3618
3619 let mut accounts = self.state.write();
3620 let state = accounts
3621 .get_mut(&request.account_id)
3622 .ok_or_else(|| service_not_found(&service_name))?;
3623 let service_key = EcsState::service_key(&cluster_name, &service_name);
3624 let svc = state
3625 .services
3626 .get(&service_key)
3627 .ok_or_else(|| service_not_found(&service_name))?;
3628 if svc.deployment_controller != "EXTERNAL" {
3629 return Err(client_exception(
3630 "CreateTaskSet requires the service to be created with \
3631 deploymentController.type = EXTERNAL",
3632 ));
3633 }
3634 let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3635 let task_set = TaskSet {
3636 task_set_id: ts_id.clone(),
3637 task_set_arn: format!(
3638 "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3639 state.region, state.account_id, cluster_name, service_name, ts_id
3640 ),
3641 service_arn: svc.service_arn.clone(),
3642 cluster_arn: svc.cluster_arn.clone(),
3643 service_name: service_name.clone(),
3644 cluster_name: cluster_name.clone(),
3645 external_id,
3646 status: "ACTIVE".into(),
3647 task_definition,
3648 computed_desired_count: 0,
3649 pending_count: 0,
3650 running_count: 0,
3651 launch_type,
3652 platform_version,
3653 scale,
3654 stability_status: "STABILIZING".into(),
3655 created_at: Utc::now(),
3656 updated_at: Utc::now(),
3657 load_balancers,
3658 service_registries,
3659 capacity_provider_strategy,
3660 tags,
3661 };
3662 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3663 state.task_sets.insert(key, task_set.clone());
3664 Ok(AwsResponse::ok_json(json!({
3665 "taskSet": task_set_to_json(&task_set),
3666 })))
3667 }
3668
3669 fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3670 let body = request.json_body();
3671 let ts_ref = req_str(&body, "taskSet")?.to_string();
3672 let service_ref = req_str(&body, "service")?;
3673 let service_name = service_name_from_ref(service_ref);
3674 let cluster_ref = opt_str(&body, "cluster");
3675 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3676 let scale = body.get("scale").cloned();
3677
3678 let mut accounts = self.state.write();
3679 let state = accounts
3680 .get_mut(&request.account_id)
3681 .ok_or_else(|| client_exception("task set not found"))?;
3682 let ts_id = task_set_id_from_ref(&ts_ref);
3683 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3684 let ts = state
3685 .task_sets
3686 .get_mut(&key)
3687 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3688 if let Some(v) = scale {
3689 ts.scale = Some(v);
3690 }
3691 ts.updated_at = Utc::now();
3692 Ok(AwsResponse::ok_json(json!({
3693 "taskSet": task_set_to_json(ts),
3694 })))
3695 }
3696
3697 fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3698 let body = request.json_body();
3699 let ts_ref = req_str(&body, "taskSet")?.to_string();
3700 let service_ref = req_str(&body, "service")?;
3701 let service_name = service_name_from_ref(service_ref);
3702 let cluster_ref = opt_str(&body, "cluster");
3703 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3704 let ts_id = task_set_id_from_ref(&ts_ref);
3705 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3706
3707 let mut accounts = self.state.write();
3708 let state = accounts
3709 .get_mut(&request.account_id)
3710 .ok_or_else(|| client_exception("task set not found"))?;
3711 let mut ts = state
3712 .task_sets
3713 .remove(&key)
3714 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3715 ts.status = "DRAINING".into();
3716 Ok(AwsResponse::ok_json(json!({
3717 "taskSet": task_set_to_json(&ts),
3718 })))
3719 }
3720
3721 fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3722 let body = request.json_body();
3723 let service_ref = req_str(&body, "service")?;
3724 let service_name = service_name_from_ref(service_ref);
3725 let cluster_ref = opt_str(&body, "cluster");
3726 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3727 let filter_refs: Vec<String> = body
3728 .get("taskSets")
3729 .and_then(|v| v.as_array())
3730 .map(|arr| {
3731 arr.iter()
3732 .filter_map(|v| v.as_str().map(String::from))
3733 .collect()
3734 })
3735 .unwrap_or_default();
3736
3737 let accounts = self.state.read();
3738 let mut found = Vec::new();
3739 let mut failures = Vec::new();
3740 if let Some(state) = accounts.get(&request.account_id) {
3741 if filter_refs.is_empty() {
3742 for ts in state.task_sets.values() {
3743 if ts.cluster_name == cluster_name && ts.service_name == service_name {
3744 found.push(task_set_to_json(ts));
3745 }
3746 }
3747 } else {
3748 for r in &filter_refs {
3749 let id = task_set_id_from_ref(r);
3750 let key = format!("{}/{}/{}", cluster_name, service_name, id);
3751 match state.task_sets.get(&key) {
3752 Some(ts) => found.push(task_set_to_json(ts)),
3753 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3754 }
3755 }
3756 }
3757 }
3758 Ok(AwsResponse::ok_json(json!({
3759 "taskSets": found,
3760 "failures": failures,
3761 })))
3762 }
3763
3764 fn update_service_primary_task_set(
3765 &self,
3766 request: &AwsRequest,
3767 ) -> Result<AwsResponse, AwsServiceError> {
3768 let body = request.json_body();
3769 let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3770 let service_ref = req_str(&body, "service")?;
3771 let service_name = service_name_from_ref(service_ref);
3772 let cluster_ref = opt_str(&body, "cluster");
3773 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3774 let ts_id = task_set_id_from_ref(&ts_ref);
3775 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3776 let mut accounts = self.state.write();
3777 let state = accounts
3778 .get_mut(&request.account_id)
3779 .ok_or_else(|| client_exception("task set not found"))?;
3780 if !state.task_sets.contains_key(&key) {
3781 return Err(client_exception(format!("task set not found: {}", ts_ref)));
3782 }
3783 for ts in state.task_sets.values_mut() {
3787 if ts.service_name == service_name
3788 && ts.cluster_name == cluster_name
3789 && ts.status == "PRIMARY"
3790 && ts.task_set_id != ts_id
3791 {
3792 ts.status = "ACTIVE".into();
3793 ts.updated_at = Utc::now();
3794 }
3795 }
3796 let ts = state.task_sets.get_mut(&key).unwrap();
3797 ts.status = "PRIMARY".into();
3798 ts.updated_at = Utc::now();
3799 Ok(AwsResponse::ok_json(json!({
3800 "taskSet": task_set_to_json(ts),
3801 })))
3802 }
3803
3804 async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3805 let body = request.json_body();
3806 let task_ref = req_str(&body, "task")?.to_string();
3807 let command = req_str(&body, "command")?.to_string();
3808 let interactive = body
3809 .get("interactive")
3810 .and_then(|v| v.as_bool())
3811 .unwrap_or(false);
3812
3813 let container_id = {
3815 let accounts = self.state.read();
3816 let state = accounts
3817 .get(&request.account_id)
3818 .ok_or_else(|| task_not_found(&task_ref))?;
3819 let id = task_id_from_ref(&task_ref);
3820 state
3821 .tasks
3822 .get(&id)
3823 .and_then(|t| t.containers.first())
3824 .and_then(|c| c.runtime_id.clone())
3825 };
3826
3827 let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3828 if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3829 let out = tokio::process::Command::new("docker")
3834 .args(["exec", &id, "sh", "-c", &command])
3835 .output()
3836 .await
3837 .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3838 tracing::info!(
3839 task = %task_ref,
3840 exit = out.status.code().unwrap_or(-1),
3841 "ExecuteCommand via docker exec"
3842 );
3843 }
3844
3845 Ok(AwsResponse::ok_json(json!({
3846 "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3847 "containerArn": container_id.unwrap_or_default(),
3848 "containerName": opt_str(&body, "container").unwrap_or(""),
3849 "interactive": interactive,
3850 "session": {
3851 "sessionId": session_id,
3852 "streamUrl": "",
3853 "tokenValue": "",
3854 },
3855 "taskArn": task_ref,
3856 })))
3857 }
3858
3859 fn submit_container_state_change(
3860 &self,
3861 request: &AwsRequest,
3862 ) -> Result<AwsResponse, AwsServiceError> {
3863 let body = request.json_body();
3868 let task_ref = opt_str(&body, "task").unwrap_or("");
3869 let container_name = opt_str(&body, "containerName").unwrap_or("");
3870 let status = opt_str(&body, "status").map(String::from);
3871 let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3872 let reason = opt_str(&body, "reason").map(String::from);
3873
3874 if !task_ref.is_empty() {
3875 let mut accounts = self.state.write();
3876 if let Some(state) = accounts.get_mut(&request.account_id) {
3877 let id = task_id_from_ref(task_ref);
3878 if let Some(task) = state.tasks.get_mut(&id) {
3879 if let Some(container) = task
3880 .containers
3881 .iter_mut()
3882 .find(|c| c.name == container_name)
3883 {
3884 if let Some(s) = status {
3885 container.last_status = s;
3886 }
3887 if let Some(code) = exit_code {
3888 container.exit_code = Some(code);
3889 }
3890 if let Some(r) = reason {
3891 container.reason = Some(r);
3892 }
3893 }
3894 }
3895 }
3896 }
3897 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3898 }
3899
3900 fn submit_task_state_change(
3901 &self,
3902 request: &AwsRequest,
3903 ) -> Result<AwsResponse, AwsServiceError> {
3904 let body = request.json_body();
3905 let task_ref = opt_str(&body, "task").unwrap_or("");
3906 let status = opt_str(&body, "status").map(String::from);
3907 if !task_ref.is_empty() {
3908 let mut accounts = self.state.write();
3909 if let Some(state) = accounts.get_mut(&request.account_id) {
3910 let id = task_id_from_ref(task_ref);
3911 if let Some(task) = state.tasks.get_mut(&id) {
3912 if let Some(s) = status {
3913 task.last_status = s;
3914 }
3915 }
3916 }
3917 }
3918 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3919 }
3920
3921 fn submit_attachment_state_changes(
3922 &self,
3923 _request: &AwsRequest,
3924 ) -> Result<AwsResponse, AwsServiceError> {
3925 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3928 }
3929
3930 fn discover_poll_endpoint(
3931 &self,
3932 _request: &AwsRequest,
3933 ) -> Result<AwsResponse, AwsServiceError> {
3934 let accounts = self.state.read();
3938 let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3939 Ok(AwsResponse::ok_json(json!({
3940 "endpoint": endpoint,
3941 "telemetryEndpoint": endpoint,
3942 "serviceConnectEndpoint": endpoint,
3943 })))
3944 }
3945
3946 fn stop_service_deployment(
3947 &self,
3948 request: &AwsRequest,
3949 ) -> Result<AwsResponse, AwsServiceError> {
3950 let body = request.json_body();
3951 let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3952 let mut accounts = self.state.write();
3953 let state = accounts
3954 .get_mut(&request.account_id)
3955 .ok_or_else(|| client_exception("service deployment not found"))?;
3956 for svc in state.services.values_mut() {
3957 for d in svc.deployments.iter_mut() {
3958 if deployment_ref.contains(&d.deployment_id) {
3959 d.status = "STOPPED".into();
3960 d.rollout_state = "FAILED".into();
3961 d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3962 d.updated_at = Utc::now();
3963 return Ok(AwsResponse::ok_json(json!({
3964 "serviceDeployment": deployment_to_json(d),
3965 })));
3966 }
3967 }
3968 }
3969 Err(client_exception(format!(
3970 "service deployment not found: {deployment_ref}"
3971 )))
3972 }
3973
3974 fn list_service_deployments(
3975 &self,
3976 request: &AwsRequest,
3977 ) -> Result<AwsResponse, AwsServiceError> {
3978 let body = request.json_body();
3979 let service_ref = req_str(&body, "service")?;
3980 let service_name = service_name_from_ref(service_ref);
3981 let cluster_ref = opt_str(&body, "cluster");
3982 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3983 let accounts = self.state.read();
3984 let mut deployments: Vec<Value> = Vec::new();
3985 if let Some(state) = accounts.get(&request.account_id) {
3986 let key = EcsState::service_key(&cluster_name, &service_name);
3987 if let Some(svc) = state.services.get(&key) {
3988 for d in &svc.deployments {
3989 deployments.push(json!({
3990 "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3991 "serviceArn": svc.service_arn,
3992 "clusterArn": svc.cluster_arn,
3993 "status": d.status,
3994 "createdAt": d.created_at.timestamp(),
3995 "startedAt": d.created_at.timestamp(),
3996 "finishedAt": null,
3997 }));
3998 }
3999 }
4000 }
4001 Ok(AwsResponse::ok_json(json!({
4002 "serviceDeployments": deployments,
4003 })))
4004 }
4005
4006 fn describe_service_deployments(
4007 &self,
4008 request: &AwsRequest,
4009 ) -> Result<AwsResponse, AwsServiceError> {
4010 let body = request.json_body();
4011 let refs: Vec<String> = body
4012 .get("serviceDeploymentArns")
4013 .and_then(|v| v.as_array())
4014 .map(|arr| {
4015 arr.iter()
4016 .filter_map(|v| v.as_str().map(String::from))
4017 .collect()
4018 })
4019 .unwrap_or_default();
4020 let accounts = self.state.read();
4021 let mut found = Vec::new();
4022 let mut failures = Vec::new();
4023 if let Some(state) = accounts.get(&request.account_id) {
4024 'next_ref: for r in &refs {
4025 for svc in state.services.values() {
4026 for d in &svc.deployments {
4027 if r.contains(&d.deployment_id) {
4028 found.push(json!({
4029 "serviceDeploymentArn": r,
4030 "serviceArn": svc.service_arn,
4031 "clusterArn": svc.cluster_arn,
4032 "status": d.status,
4033 "createdAt": d.created_at.timestamp(),
4034 "startedAt": d.created_at.timestamp(),
4035 "finishedAt": null,
4036 "deploymentConfiguration": {
4037 "minimumHealthyPercent": svc.minimum_healthy_percent,
4038 "maximumPercent": svc.maximum_percent,
4039 },
4040 "sourceServiceRevisions": [],
4041 "targetServiceRevision": {
4042 "arn": d.task_definition_arn,
4043 "requestedTaskCount": d.desired_count,
4044 "runningTaskCount": d.running_count,
4045 "pendingTaskCount": d.pending_count,
4046 "failedTasks": d.failed_tasks,
4047 },
4048 }));
4049 continue 'next_ref;
4050 }
4051 }
4052 }
4053 failures.push(json!({"arn": r, "reason": "MISSING"}));
4054 }
4055 }
4056 Ok(AwsResponse::ok_json(json!({
4057 "serviceDeployments": found,
4058 "failures": failures,
4059 })))
4060 }
4061
4062 fn describe_service_revisions(
4063 &self,
4064 request: &AwsRequest,
4065 ) -> Result<AwsResponse, AwsServiceError> {
4066 let body = request.json_body();
4067 let refs: Vec<String> = body
4068 .get("serviceRevisionArns")
4069 .and_then(|v| v.as_array())
4070 .map(|arr| {
4071 arr.iter()
4072 .filter_map(|v| v.as_str().map(String::from))
4073 .collect()
4074 })
4075 .unwrap_or_default();
4076 Ok(AwsResponse::ok_json(json!({
4077 "serviceRevisions": [],
4078 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
4079 })))
4080 }
4081}
4082
4083fn container_instance_id_from_ref(input: &str) -> String {
4084 input.rsplit('/').next().unwrap_or(input).to_string()
4085}
4086
4087fn container_instance_not_found(input: &str) -> AwsServiceError {
4088 AwsServiceError::aws_error(
4089 StatusCode::BAD_REQUEST,
4090 "ClientException",
4091 format!("Container instance not found: {input}"),
4092 )
4093}
4094
4095fn capacity_provider_name_from_ref(input: &str) -> String {
4096 input.rsplit('/').next().unwrap_or(input).to_string()
4097}
4098
4099fn capacity_provider_not_found(name: &str) -> AwsServiceError {
4100 AwsServiceError::aws_error(
4101 StatusCode::BAD_REQUEST,
4102 "ClientException",
4103 format!("Capacity provider not found: {name}"),
4104 )
4105}
4106
4107fn task_set_id_from_ref(input: &str) -> String {
4108 input.rsplit('/').next().unwrap_or(input).to_string()
4109}
4110
4111fn container_instance_to_json(ci: &ContainerInstance) -> Value {
4112 json!({
4113 "containerInstanceArn": ci.container_instance_arn,
4114 "ec2InstanceId": ci.ec2_instance_id,
4115 "status": ci.status,
4116 "version": ci.version,
4117 "versionInfo": ci.version_info,
4118 "agentConnected": ci.agent_connected,
4119 "agentUpdateStatus": ci.agent_update_status,
4120 "remainingResources": ci.remaining_resources,
4121 "registeredResources": ci.registered_resources,
4122 "runningTasksCount": ci.running_tasks_count,
4123 "pendingTasksCount": ci.pending_tasks_count,
4124 "registeredAt": ci.registered_at.timestamp(),
4125 "attributes": ci.attributes.iter().map(|a| json!({
4126 "name": a.name,
4127 "value": a.value,
4128 "targetType": a.target_type,
4129 "targetId": a.target_id,
4130 })).collect::<Vec<_>>(),
4131 "tags": tags_json(&ci.tags),
4132 "capacityProviderName": ci.capacity_provider_name,
4133 "healthStatus": ci.health_status,
4134 })
4135}
4136
4137fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
4138 json!({
4139 "name": cp.name,
4140 "capacityProviderArn": cp.arn,
4141 "status": cp.status,
4142 "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4143 "updateStatus": cp.update_status,
4144 "updateStatusReason": cp.update_status_reason,
4145 "tags": tags_json(&cp.tags),
4146 })
4147}
4148
4149fn task_set_to_json(ts: &TaskSet) -> Value {
4150 json!({
4151 "id": ts.task_set_id,
4152 "taskSetArn": ts.task_set_arn,
4153 "serviceArn": ts.service_arn,
4154 "clusterArn": ts.cluster_arn,
4155 "externalId": ts.external_id,
4156 "status": ts.status,
4157 "taskDefinition": ts.task_definition,
4158 "computedDesiredCount": ts.computed_desired_count,
4159 "pendingCount": ts.pending_count,
4160 "runningCount": ts.running_count,
4161 "launchType": ts.launch_type,
4162 "platformVersion": ts.platform_version,
4163 "scale": ts.scale,
4164 "stabilityStatus": ts.stability_status,
4165 "createdAt": ts.created_at.timestamp(),
4166 "updatedAt": ts.updated_at.timestamp(),
4167 "loadBalancers": ts.load_balancers,
4168 "serviceRegistries": ts.service_registries,
4169 "capacityProviderStrategy": ts.capacity_provider_strategy,
4170 "tags": tags_json(&ts.tags),
4171 })
4172}
4173
4174fn task_protection_json(task: &Task) -> Value {
4175 let p = task.protection.as_ref();
4176 json!({
4177 "taskArn": task.task_arn,
4178 "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4179 "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4180 })
4181}
4182
4183#[cfg(test)]
4184mod tests {
4185 use super::*;
4186
4187 #[test]
4188 fn parse_family_revision_with_revision() {
4189 assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4190 }
4191
4192 #[test]
4193 fn parse_family_revision_without_revision() {
4194 assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4195 }
4196
4197 #[test]
4198 fn parse_family_revision_non_numeric_treated_as_no_revision() {
4199 assert_eq!(
4200 parse_family_revision("web:latest"),
4201 ("web:latest".to_string(), None)
4202 );
4203 }
4204
4205 #[test]
4206 fn decode_ecs_arn_cluster() {
4207 let (account, rtype, tail) =
4208 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4209 assert_eq!(account, "111122223333");
4210 assert_eq!(rtype, "cluster");
4211 assert_eq!(tail, "prod");
4212 }
4213
4214 #[test]
4215 fn decode_ecs_arn_task_definition() {
4216 let (account, rtype, tail) =
4217 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4218 assert_eq!(account, "111122223333");
4219 assert_eq!(rtype, "task-definition");
4220 assert_eq!(tail, "web:5");
4221 }
4222
4223 #[test]
4224 fn decode_ecs_arn_rejects_non_ecs() {
4225 assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4226 }
4227
4228 #[test]
4229 fn resolve_service_key_handles_short_and_long() {
4230 let mut state = EcsState::new("123456789012", "us-east-1");
4231 state.services.insert(
4232 "default/api".to_string(),
4233 Service {
4234 service_name: "api".into(),
4235 service_arn: "arn".into(),
4236 cluster_name: "default".into(),
4237 cluster_arn: "arn".into(),
4238 task_definition_arn: "td-arn".into(),
4239 family: "td".into(),
4240 revision: 1,
4241 desired_count: 0,
4242 running_count: 0,
4243 pending_count: 0,
4244 launch_type: "FARGATE".into(),
4245 status: "ACTIVE".into(),
4246 scheduling_strategy: "REPLICA".into(),
4247 deployment_controller: "ECS".into(),
4248 minimum_healthy_percent: None,
4249 maximum_percent: None,
4250 circuit_breaker: None,
4251 deployments: vec![],
4252 load_balancers: vec![],
4253 service_registries: vec![],
4254 placement_constraints: vec![],
4255 placement_strategy: vec![],
4256 network_configuration: None,
4257 tags: vec![],
4258 created_at: chrono::Utc::now(),
4259 created_by: None,
4260 role_arn: None,
4261 },
4262 );
4263 assert_eq!(
4265 resolve_service_key(&state, "default/api"),
4266 Some("default/api".to_string())
4267 );
4268 assert_eq!(
4270 resolve_service_key(&state, "api"),
4271 Some("default/api".to_string())
4272 );
4273 assert_eq!(resolve_service_key(&state, "nope"), None);
4275 }
4276
4277 #[test]
4278 fn resolve_container_instance_key_handles_short_and_long() {
4279 let mut state = EcsState::new("123456789012", "us-east-1");
4280 state.container_instances.insert(
4281 "default/abc-123".to_string(),
4282 ContainerInstance {
4283 container_instance_arn: "arn".into(),
4284 ec2_instance_id: Some("i-x".into()),
4285 cluster_name: "default".into(),
4286 cluster_arn: "arn".into(),
4287 status: "ACTIVE".into(),
4288 version: 0,
4289 version_info: None,
4290 agent_connected: true,
4291 agent_update_status: None,
4292 remaining_resources: vec![],
4293 registered_resources: vec![],
4294 running_tasks_count: 0,
4295 pending_tasks_count: 0,
4296 registered_at: chrono::Utc::now(),
4297 attributes: vec![],
4298 tags: vec![],
4299 capacity_provider_name: None,
4300 health_status: None,
4301 },
4302 );
4303 assert_eq!(
4304 resolve_container_instance_key(&state, "default/abc-123"),
4305 Some("default/abc-123".to_string())
4306 );
4307 assert_eq!(
4308 resolve_container_instance_key(&state, "abc-123"),
4309 Some("default/abc-123".to_string())
4310 );
4311 assert_eq!(resolve_container_instance_key(&state, "nope"), None);
4312 }
4313
4314 #[test]
4315 fn validate_family_name_accepts_hyphen_underscore() {
4316 assert!(validate_family_name("web_server-2").is_ok());
4317 }
4318
4319 #[test]
4320 fn validate_family_name_rejects_empty() {
4321 assert!(validate_family_name("").is_err());
4322 }
4323
4324 #[test]
4325 fn validate_family_name_rejects_slash() {
4326 assert!(validate_family_name("web/server").is_err());
4327 }
4328
4329 #[test]
4330 fn resolve_task_definition_ref_bare_family() {
4331 let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4332 assert_eq!(account, None);
4333 assert_eq!(family, "web");
4334 assert_eq!(rev, None);
4335 }
4336
4337 #[test]
4338 fn resolve_task_definition_ref_family_revision() {
4339 let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4340 assert_eq!(account, None);
4341 assert_eq!(family, "web");
4342 assert_eq!(rev, Some(3));
4343 }
4344
4345 #[test]
4346 fn resolve_task_definition_ref_full_arn() {
4347 let (account, family, rev) =
4348 resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4349 .unwrap();
4350 assert_eq!(account, Some("111122223333".to_string()));
4351 assert_eq!(family, "web");
4352 assert_eq!(rev, Some(3));
4353 }
4354
4355 #[test]
4356 fn merge_tags_replaces_existing_value() {
4357 let mut current = vec![TagEntry {
4358 key: "env".into(),
4359 value: "dev".into(),
4360 }];
4361 merge_tags(
4362 &mut current,
4363 vec![TagEntry {
4364 key: "env".into(),
4365 value: "prod".into(),
4366 }],
4367 );
4368 assert_eq!(current.len(), 1);
4369 assert_eq!(current[0].value, "prod");
4370 }
4371
4372 #[test]
4373 fn merge_tags_adds_new() {
4374 let mut current = vec![TagEntry {
4375 key: "env".into(),
4376 value: "dev".into(),
4377 }];
4378 merge_tags(
4379 &mut current,
4380 vec![TagEntry {
4381 key: "team".into(),
4382 value: "platform".into(),
4383 }],
4384 );
4385 assert_eq!(current.len(), 2);
4386 }
4387
4388 #[test]
4389 fn parse_tags_reads_lowercase_keys() {
4390 let body = json!({
4391 "tags": [
4392 {"key": "env", "value": "prod"},
4393 {"key": "team", "value": "platform"},
4394 ]
4395 });
4396 let tags = parse_tags(&body);
4397 assert_eq!(tags.len(), 2);
4398 assert_eq!(tags[0].key, "env");
4399 assert_eq!(tags[0].value, "prod");
4400 }
4401
4402 #[test]
4403 fn matches_filter_respects_none() {
4404 assert!(matches_filter(None, "anything"));
4405 assert!(matches_filter(Some("x"), "x"));
4406 assert!(!matches_filter(Some("x"), "y"));
4407 }
4408}