1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
10use fakecloud_persistence::SnapshotStore;
11
12use crate::state::{
13 Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
14 Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
15 TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
16};
17
18const SUPPORTED_ACTIONS: &[&str] = &[
19 "CreateCluster",
20 "DescribeClusters",
21 "DeleteCluster",
22 "ListClusters",
23 "UpdateCluster",
24 "UpdateClusterSettings",
25 "PutClusterCapacityProviders",
26 "RegisterTaskDefinition",
27 "DescribeTaskDefinition",
28 "DeregisterTaskDefinition",
29 "DeleteTaskDefinitions",
30 "ListTaskDefinitions",
31 "ListTaskDefinitionFamilies",
32 "TagResource",
33 "UntagResource",
34 "ListTagsForResource",
35 "PutAccountSetting",
36 "PutAccountSettingDefault",
37 "DeleteAccountSetting",
38 "ListAccountSettings",
39 "RunTask",
40 "StartTask",
41 "StopTask",
42 "DescribeTasks",
43 "ListTasks",
44 "CreateService",
45 "UpdateService",
46 "DeleteService",
47 "DescribeServices",
48 "ListServices",
49 "ListServicesByNamespace",
50 "RegisterContainerInstance",
51 "DeregisterContainerInstance",
52 "DescribeContainerInstances",
53 "ListContainerInstances",
54 "UpdateContainerAgent",
55 "UpdateContainerInstancesState",
56 "PutAttributes",
57 "DeleteAttributes",
58 "ListAttributes",
59 "CreateCapacityProvider",
60 "DeleteCapacityProvider",
61 "DescribeCapacityProviders",
62 "UpdateCapacityProvider",
63 "GetTaskProtection",
64 "UpdateTaskProtection",
65 "CreateTaskSet",
66 "UpdateTaskSet",
67 "DeleteTaskSet",
68 "DescribeTaskSets",
69 "UpdateServicePrimaryTaskSet",
70 "ExecuteCommand",
71 "SubmitContainerStateChange",
72 "SubmitTaskStateChange",
73 "SubmitAttachmentStateChanges",
74 "DiscoverPollEndpoint",
75 "StopServiceDeployment",
76 "ListServiceDeployments",
77 "DescribeServiceDeployments",
78 "DescribeServiceRevisions",
79];
80
81fn is_mutating(action: &str) -> bool {
82 matches!(
83 action,
84 "CreateCluster"
85 | "DeleteCluster"
86 | "UpdateCluster"
87 | "UpdateClusterSettings"
88 | "PutClusterCapacityProviders"
89 | "RegisterTaskDefinition"
90 | "DeregisterTaskDefinition"
91 | "DeleteTaskDefinitions"
92 | "TagResource"
93 | "UntagResource"
94 | "PutAccountSetting"
95 | "PutAccountSettingDefault"
96 | "DeleteAccountSetting"
97 | "RunTask"
98 | "StartTask"
99 | "StopTask"
100 | "CreateService"
101 | "UpdateService"
102 | "DeleteService"
103 | "RegisterContainerInstance"
104 | "DeregisterContainerInstance"
105 | "UpdateContainerAgent"
106 | "UpdateContainerInstancesState"
107 | "PutAttributes"
108 | "DeleteAttributes"
109 | "CreateCapacityProvider"
110 | "DeleteCapacityProvider"
111 | "UpdateCapacityProvider"
112 | "UpdateTaskProtection"
113 | "CreateTaskSet"
114 | "UpdateTaskSet"
115 | "DeleteTaskSet"
116 | "UpdateServicePrimaryTaskSet"
117 | "SubmitContainerStateChange"
118 | "SubmitTaskStateChange"
119 | "SubmitAttachmentStateChanges"
120 | "StopServiceDeployment"
121 )
122}
123
124pub struct EcsService {
125 state: SharedEcsState,
126 snapshot_store: Option<Arc<dyn SnapshotStore>>,
127 snapshot_lock: Arc<AsyncMutex<()>>,
128 runtime: Option<Arc<crate::runtime::EcsRuntime>>,
129 role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
130}
131
132impl EcsService {
133 pub fn new(state: SharedEcsState) -> Self {
134 Self {
135 state,
136 snapshot_store: None,
137 snapshot_lock: Arc::new(AsyncMutex::new(())),
138 runtime: None,
139 role_trust_validator: None,
140 }
141 }
142
143 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
144 self.snapshot_store = Some(store);
145 self
146 }
147
148 pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
149 self.runtime = Some(runtime);
150 self
151 }
152
153 pub fn with_role_trust_validator(
154 mut self,
155 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
156 ) -> Self {
157 self.role_trust_validator = Some(validator);
158 self
159 }
160
161 fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
162 let Some(ref validator) = self.role_trust_validator else {
163 return Ok(());
164 };
165 if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
166 return Err(AwsServiceError::aws_error(
167 StatusCode::BAD_REQUEST,
168 "InvalidParameterException",
169 err.to_string(),
170 ));
171 }
172 Ok(())
173 }
174
175 pub fn state_handle(&self) -> &SharedEcsState {
176 &self.state
177 }
178
179 async fn save_snapshot(&self) {
180 let Some(store) = self.snapshot_store.clone() else {
181 return;
182 };
183 let _guard = self.snapshot_lock.lock().await;
184 let snapshot = EcsSnapshot {
185 schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
186 accounts: Some(self.state.read().clone()),
187 };
188 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
189 let bytes = serde_json::to_vec(&snapshot)
190 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
191 store.save(&bytes)
192 })
193 .await;
194 match join {
195 Ok(Ok(())) => {}
196 Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
197 Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
198 }
199 }
200}
201
202#[async_trait]
203impl AwsService for EcsService {
204 fn service_name(&self) -> &str {
205 "ecs"
206 }
207
208 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
209 let mutates = is_mutating(request.action.as_str());
210 let result = match request.action.as_str() {
211 "CreateCluster" => self.create_cluster(&request),
212 "DescribeClusters" => self.describe_clusters(&request),
213 "DeleteCluster" => self.delete_cluster(&request),
214 "ListClusters" => self.list_clusters(&request),
215 "UpdateCluster" => self.update_cluster(&request),
216 "UpdateClusterSettings" => self.update_cluster_settings(&request),
217 "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
218 "RegisterTaskDefinition" => self.register_task_definition(&request),
219 "DescribeTaskDefinition" => self.describe_task_definition(&request),
220 "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
221 "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
222 "ListTaskDefinitions" => self.list_task_definitions(&request),
223 "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
224 "TagResource" => self.tag_resource(&request),
225 "UntagResource" => self.untag_resource(&request),
226 "ListTagsForResource" => self.list_tags_for_resource(&request),
227 "PutAccountSetting" => self.put_account_setting(&request),
228 "PutAccountSettingDefault" => self.put_account_setting_default(&request),
229 "DeleteAccountSetting" => self.delete_account_setting(&request),
230 "ListAccountSettings" => self.list_account_settings(&request),
231 "RunTask" => self.run_task(&request),
232 "StartTask" => self.start_task(&request),
233 "StopTask" => self.stop_task(&request).await,
234 "DescribeTasks" => self.describe_tasks(&request),
235 "ListTasks" => self.list_tasks(&request),
236 "CreateService" => self.create_service(&request),
237 "UpdateService" => self.update_service(&request),
238 "DeleteService" => self.delete_service(&request).await,
239 "DescribeServices" => self.describe_services(&request),
240 "ListServices" => self.list_services(&request),
241 "ListServicesByNamespace" => self.list_services_by_namespace(&request),
242 "RegisterContainerInstance" => self.register_container_instance(&request),
243 "DeregisterContainerInstance" => self.deregister_container_instance(&request),
244 "DescribeContainerInstances" => self.describe_container_instances(&request),
245 "ListContainerInstances" => self.list_container_instances(&request),
246 "UpdateContainerAgent" => self.update_container_agent(&request),
247 "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
248 "PutAttributes" => self.put_attributes(&request),
249 "DeleteAttributes" => self.delete_attributes(&request),
250 "ListAttributes" => self.list_attributes(&request),
251 "CreateCapacityProvider" => self.create_capacity_provider(&request),
252 "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
253 "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
254 "UpdateCapacityProvider" => self.update_capacity_provider(&request),
255 "GetTaskProtection" => self.get_task_protection(&request),
256 "UpdateTaskProtection" => self.update_task_protection(&request),
257 "CreateTaskSet" => self.create_task_set(&request),
258 "UpdateTaskSet" => self.update_task_set(&request),
259 "DeleteTaskSet" => self.delete_task_set(&request),
260 "DescribeTaskSets" => self.describe_task_sets(&request),
261 "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
262 "ExecuteCommand" => self.execute_command(&request).await,
263 "SubmitContainerStateChange" => self.submit_container_state_change(&request),
264 "SubmitTaskStateChange" => self.submit_task_state_change(&request),
265 "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
266 "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
267 "StopServiceDeployment" => self.stop_service_deployment(&request),
268 "ListServiceDeployments" => self.list_service_deployments(&request),
269 "DescribeServiceDeployments" => self.describe_service_deployments(&request),
270 "DescribeServiceRevisions" => self.describe_service_revisions(&request),
271 _ => Err(AwsServiceError::action_not_implemented(
272 "ecs",
273 &request.action,
274 )),
275 };
276 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
277 self.save_snapshot().await;
278 }
279 result
280 }
281
282 fn supported_actions(&self) -> &[&str] {
283 SUPPORTED_ACTIONS
284 }
285}
286
287fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
290 body.get(field)
291 .and_then(|v| v.as_str())
292 .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
293}
294
295fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
296 body.get(field).and_then(|v| v.as_str())
297}
298
299fn client_exception(message: impl Into<String>) -> AwsServiceError {
300 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
301}
302
303fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
304 AwsServiceError::aws_error(
305 StatusCode::BAD_REQUEST,
306 "InvalidParameterException",
307 message,
308 )
309}
310
311fn cluster_not_found(name: &str) -> AwsServiceError {
312 AwsServiceError::aws_error(
313 StatusCode::BAD_REQUEST,
314 "ClusterNotFoundException",
315 format!("The referenced cluster was inactive: {name}"),
316 )
317}
318
319fn cluster_contains_services() -> AwsServiceError {
320 AwsServiceError::aws_error(
321 StatusCode::BAD_REQUEST,
322 "ClusterContainsServicesException",
323 "The specified cluster still contains active services",
324 )
325}
326
327fn cluster_contains_tasks() -> AwsServiceError {
328 AwsServiceError::aws_error(
329 StatusCode::BAD_REQUEST,
330 "ClusterContainsTasksException",
331 "The specified cluster still contains active tasks",
332 )
333}
334
335fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
336 AwsServiceError::aws_error(
337 StatusCode::BAD_REQUEST,
338 "ClientException",
339 format!("Unable to describe task definition: {family_rev}"),
340 )
341}
342
343fn parse_tags(body: &Value) -> Vec<TagEntry> {
344 body.get("tags")
345 .and_then(|v| v.as_array())
346 .map(|arr| {
347 arr.iter()
348 .filter_map(|t| {
349 let k = t.get("key").and_then(|v| v.as_str())?;
350 let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
351 Some(TagEntry {
352 key: k.to_string(),
353 value: v.to_string(),
354 })
355 })
356 .collect()
357 })
358 .unwrap_or_default()
359}
360
361fn tags_json(tags: &[TagEntry]) -> Value {
362 Value::Array(
363 tags.iter()
364 .map(|t| json!({"key": t.key, "value": t.value}))
365 .collect(),
366 )
367}
368
369fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
370 for new_tag in incoming {
371 if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
372 existing.value = new_tag.value;
373 } else {
374 current.push(new_tag);
375 }
376 }
377}
378
379fn cluster_to_json(cluster: &Cluster) -> Value {
380 json!({
381 "clusterArn": cluster.cluster_arn,
382 "clusterName": cluster.cluster_name,
383 "status": cluster.status,
384 "registeredContainerInstancesCount": cluster.registered_container_instances_count,
385 "runningTasksCount": cluster.running_tasks_count,
386 "pendingTasksCount": cluster.pending_tasks_count,
387 "activeServicesCount": cluster.active_services_count,
388 "statistics": cluster.statistics,
389 "tags": tags_json(&cluster.tags),
390 "settings": cluster.settings,
391 "configuration": cluster.configuration,
392 "capacityProviders": cluster.capacity_providers,
393 "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
394 "attachments": cluster.attachments,
395 "attachmentsStatus": cluster.attachments_status,
396 "serviceConnectDefaults": cluster.service_connect_defaults,
397 })
398}
399
400fn task_definition_to_json(td: &TaskDefinition) -> Value {
401 let mut map = Map::new();
402 map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
403 map.insert("family".into(), json!(td.family));
404 map.insert("revision".into(), json!(td.revision));
405 map.insert("status".into(), json!(td.status));
406 map.insert(
407 "containerDefinitions".into(),
408 Value::Array(td.container_definitions.clone()),
409 );
410 map.insert("compatibilities".into(), json!(td.compatibilities));
411 map.insert(
412 "requiresCompatibilities".into(),
413 json!(td.requires_compatibilities),
414 );
415 map.insert("volumes".into(), Value::Array(td.volumes.clone()));
416 map.insert(
417 "placementConstraints".into(),
418 Value::Array(td.placement_constraints.clone()),
419 );
420 map.insert(
421 "requiresAttributes".into(),
422 Value::Array(td.requires_attributes.clone()),
423 );
424 map.insert(
425 "inferenceAccelerators".into(),
426 Value::Array(td.inference_accelerators.clone()),
427 );
428 if let Some(ref x) = td.network_mode {
429 map.insert("networkMode".into(), json!(x));
430 }
431 if let Some(ref x) = td.cpu {
432 map.insert("cpu".into(), json!(x));
433 }
434 if let Some(ref x) = td.memory {
435 map.insert("memory".into(), json!(x));
436 }
437 if let Some(ref x) = td.task_role_arn {
438 map.insert("taskRoleArn".into(), json!(x));
439 }
440 if let Some(ref x) = td.execution_role_arn {
441 map.insert("executionRoleArn".into(), json!(x));
442 }
443 if let Some(ref x) = td.pid_mode {
444 map.insert("pidMode".into(), json!(x));
445 }
446 if let Some(ref x) = td.ipc_mode {
447 map.insert("ipcMode".into(), json!(x));
448 }
449 if let Some(ref x) = td.proxy_configuration {
450 map.insert("proxyConfiguration".into(), x.clone());
451 }
452 if let Some(ref x) = td.ephemeral_storage {
453 map.insert("ephemeralStorage".into(), x.clone());
454 }
455 if let Some(ref x) = td.runtime_platform {
456 map.insert("runtimePlatform".into(), x.clone());
457 }
458 if let Some(ref x) = td.registered_by {
459 map.insert("registeredBy".into(), json!(x));
460 }
461 map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
462 if let Some(ts) = td.deregistered_at {
463 map.insert("deregisteredAt".into(), json!(ts.timestamp()));
464 }
465 if let Some(enabled) = td.enable_fault_injection {
466 map.insert("enableFaultInjection".into(), json!(enabled));
467 }
468 Value::Object(map)
469}
470
471fn resolve_service_key(state: &EcsState, tail: &str) -> Option<String> {
477 if tail.contains('/') {
478 return state.services.contains_key(tail).then(|| tail.to_string());
479 }
480 let suffix = format!("/{tail}");
481 state
482 .services
483 .keys()
484 .find(|k| k.ends_with(&suffix))
485 .cloned()
486}
487
488fn resolve_container_instance_key(state: &EcsState, tail: &str) -> Option<String> {
491 if tail.contains('/') {
492 return state
493 .container_instances
494 .contains_key(tail)
495 .then(|| tail.to_string());
496 }
497 let suffix = format!("/{tail}");
498 state
499 .container_instances
500 .keys()
501 .find(|k| k.ends_with(&suffix))
502 .cloned()
503}
504
505fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
509 let rest = arn
510 .strip_prefix("arn:aws:ecs:")
511 .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
512 let mut parts = rest.splitn(3, ':');
515 let _region = parts
516 .next()
517 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
518 let account = parts
519 .next()
520 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
521 let resource = parts
522 .next()
523 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
524 let (resource_type, tail) = resource
525 .split_once('/')
526 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
527 Ok((
528 account.to_string(),
529 resource_type.to_string(),
530 tail.to_string(),
531 ))
532}
533
534fn parse_family_revision(input: &str) -> (String, Option<i32>) {
538 if let Some((family, rev)) = input.rsplit_once(':') {
539 if let Ok(n) = rev.parse::<i32>() {
540 return (family.to_string(), Some(n));
541 }
542 }
543 (input.to_string(), None)
544}
545
546fn resolve_task_definition_ref(
550 input: &str,
551) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
552 if input.starts_with("arn:aws:ecs:") {
553 let (account, resource_type, tail) = decode_ecs_arn(input)?;
554 if resource_type != "task-definition" {
555 return Err(invalid_parameter(format!(
556 "Expected task-definition ARN: {input}"
557 )));
558 }
559 let (family, rev) = parse_family_revision(&tail);
560 Ok((Some(account), family, rev))
561 } else {
562 let (family, rev) = parse_family_revision(input);
563 Ok((None, family, rev))
564 }
565}
566
567fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
568 if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
569 account
570 } else {
571 request.account_id.clone()
572 }
573}
574
575fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
576 if let Some(input) = cluster_ref {
577 if input.starts_with("arn:aws:ecs:") {
578 if let Ok((account, _, _)) = decode_ecs_arn(input) {
579 return account;
580 }
581 }
582 }
583 request.account_id.clone()
584}
585
586fn latest_active_revision(
587 revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
588) -> Option<&TaskDefinition> {
589 revisions.values().rev().find(|td| td.status == "ACTIVE")
590}
591
592impl EcsService {
595 fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
596 let body = request.json_body();
597 let cluster_name = opt_str(&body, "clusterName")
598 .unwrap_or("default")
599 .to_string();
600 let tags = parse_tags(&body);
601 let settings: Vec<Value> = body
602 .get("settings")
603 .and_then(|v| v.as_array())
604 .cloned()
605 .unwrap_or_default();
606 let configuration = body.get("configuration").cloned();
607 let capacity_providers: Vec<String> = body
608 .get("capacityProviders")
609 .and_then(|v| v.as_array())
610 .map(|arr| {
611 arr.iter()
612 .filter_map(|v| v.as_str().map(String::from))
613 .collect()
614 })
615 .unwrap_or_default();
616 let default_strategy: Vec<Value> = body
617 .get("defaultCapacityProviderStrategy")
618 .and_then(|v| v.as_array())
619 .cloned()
620 .unwrap_or_default();
621 let service_connect = body.get("serviceConnectDefaults").cloned();
622
623 let account = request.account_id.clone();
624 let mut accounts = self.state.write();
625 let state = accounts.get_or_create(&account);
626 let arn = state.cluster_arn(&cluster_name);
627 let mut cluster = Cluster::new(&cluster_name, arn);
628 cluster.tags = tags;
629 cluster.settings = settings;
630 cluster.configuration = configuration;
631 cluster.capacity_providers = capacity_providers;
632 cluster.default_capacity_provider_strategy = default_strategy;
633 cluster.service_connect_defaults = service_connect;
634 state.clusters.insert(cluster_name.clone(), cluster.clone());
638
639 Ok(AwsResponse::ok_json(json!({
640 "cluster": cluster_to_json(&cluster),
641 })))
642 }
643
644 fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
645 let body = request.json_body();
646 let names: Vec<String> = body
647 .get("clusters")
648 .and_then(|v| v.as_array())
649 .map(|arr| {
650 arr.iter()
651 .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
652 .collect()
653 })
654 .unwrap_or_else(|| vec!["default".to_string()]);
655
656 let account = request.account_id.clone();
657 let accounts = self.state.read();
658 let mut found = Vec::new();
659 let mut failures = Vec::new();
660 if let Some(state) = accounts.get(&account) {
661 for name in &names {
662 match state.clusters.get(name) {
663 Some(c) => found.push(cluster_to_json(c)),
664 None => failures.push(json!({
665 "arn": state.cluster_arn(name),
666 "reason": "MISSING",
667 })),
668 }
669 }
670 } else {
671 for name in &names {
672 failures.push(json!({
673 "arn": format!(
674 "arn:aws:ecs:{}:{}:cluster/{}",
675 accounts.region(),
676 account,
677 name
678 ),
679 "reason": "MISSING",
680 }));
681 }
682 }
683 Ok(AwsResponse::ok_json(json!({
684 "clusters": found,
685 "failures": failures,
686 })))
687 }
688
689 fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
690 let body = request.json_body();
691 let cluster_ref = opt_str(&body, "cluster");
692 let name = EcsState::resolve_cluster_name(cluster_ref);
693 let account = target_account_for_cluster(request, cluster_ref);
694
695 let mut accounts = self.state.write();
696 let state = accounts.get_or_create(&account);
697 let cluster = state
698 .clusters
699 .get_mut(&name)
700 .ok_or_else(|| cluster_not_found(&name))?;
701 if cluster.active_services_count > 0 {
702 return Err(cluster_contains_services());
703 }
704 if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
705 return Err(cluster_contains_tasks());
706 }
707 cluster.status = "INACTIVE".to_string();
708 let snapshot = cluster.clone();
709 state.clusters.remove(&name);
714 Ok(AwsResponse::ok_json(json!({
715 "cluster": cluster_to_json(&snapshot),
716 })))
717 }
718
719 fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
720 let body = request.json_body();
721 let max_results = body
722 .get("maxResults")
723 .and_then(|v| v.as_i64())
724 .filter(|n| (1..=100).contains(n))
725 .map(|n| n as usize)
726 .unwrap_or(100);
727 let next_token = opt_str(&body, "nextToken").unwrap_or("");
728
729 let account = request.account_id.clone();
730 let accounts = self.state.read();
731 let arns: Vec<String> = match accounts.get(&account) {
732 Some(state) => state
733 .clusters
734 .values()
735 .map(|c| c.cluster_arn.clone())
736 .collect(),
737 None => Vec::new(),
738 };
739 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
740 let end = (start + max_results).min(arns.len());
741 let page = arns[start..end].to_vec();
742 let next = if end < arns.len() {
743 Some(end.to_string())
744 } else {
745 None
746 };
747 let mut out = json!({ "clusterArns": page });
748 if let Some(n) = next {
749 out.as_object_mut()
750 .unwrap()
751 .insert("nextToken".into(), json!(n));
752 }
753 Ok(AwsResponse::ok_json(out))
754 }
755
756 fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
757 let body = request.json_body();
758 let cluster_ref = req_str(&body, "cluster")?;
759 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
760 let account = target_account_for_cluster(request, Some(cluster_ref));
761
762 let mut accounts = self.state.write();
763 let state = accounts.get_or_create(&account);
764 let cluster = state
765 .clusters
766 .get_mut(&name)
767 .ok_or_else(|| cluster_not_found(&name))?;
768 if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
769 cluster.settings = settings.clone();
770 }
771 if let Some(cfg) = body.get("configuration") {
772 cluster.configuration = Some(cfg.clone());
773 }
774 if let Some(sc) = body.get("serviceConnectDefaults") {
775 cluster.service_connect_defaults = Some(sc.clone());
776 }
777 let snapshot = cluster.clone();
778 Ok(AwsResponse::ok_json(json!({
779 "cluster": cluster_to_json(&snapshot),
780 })))
781 }
782
783 fn update_cluster_settings(
784 &self,
785 request: &AwsRequest,
786 ) -> Result<AwsResponse, AwsServiceError> {
787 let body = request.json_body();
788 let cluster_ref = req_str(&body, "cluster")?;
789 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
790 let account = target_account_for_cluster(request, Some(cluster_ref));
791 let settings: Vec<Value> = body
792 .get("settings")
793 .and_then(|v| v.as_array())
794 .cloned()
795 .unwrap_or_default();
796
797 let mut accounts = self.state.write();
798 let state = accounts.get_or_create(&account);
799 let cluster = state
800 .clusters
801 .get_mut(&name)
802 .ok_or_else(|| cluster_not_found(&name))?;
803 cluster.settings = settings;
804 let snapshot = cluster.clone();
805 Ok(AwsResponse::ok_json(json!({
806 "cluster": cluster_to_json(&snapshot),
807 })))
808 }
809
810 fn put_cluster_capacity_providers(
811 &self,
812 request: &AwsRequest,
813 ) -> Result<AwsResponse, AwsServiceError> {
814 let body = request.json_body();
815 let cluster_ref = req_str(&body, "cluster")?;
816 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
817 let account = target_account_for_cluster(request, Some(cluster_ref));
818 let capacity_providers: Vec<String> = body
819 .get("capacityProviders")
820 .and_then(|v| v.as_array())
821 .map(|arr| {
822 arr.iter()
823 .filter_map(|v| v.as_str().map(String::from))
824 .collect()
825 })
826 .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
827 let default_strategy: Vec<Value> = body
828 .get("defaultCapacityProviderStrategy")
829 .and_then(|v| v.as_array())
830 .cloned()
831 .ok_or_else(|| {
832 client_exception("Missing required field: defaultCapacityProviderStrategy")
833 })?;
834
835 let mut accounts = self.state.write();
836 let state = accounts.get_or_create(&account);
837 let cluster = state
838 .clusters
839 .get_mut(&name)
840 .ok_or_else(|| cluster_not_found(&name))?;
841 cluster.capacity_providers = capacity_providers;
842 cluster.default_capacity_provider_strategy = default_strategy;
843 let snapshot = cluster.clone();
844 Ok(AwsResponse::ok_json(json!({
845 "cluster": cluster_to_json(&snapshot),
846 })))
847 }
848}
849
850impl EcsService {
853 fn register_task_definition(
854 &self,
855 request: &AwsRequest,
856 ) -> Result<AwsResponse, AwsServiceError> {
857 let body = request.json_body();
858 let family = req_str(&body, "family")?.to_string();
859 validate_family_name(&family)?;
860 let container_definitions = body
861 .get("containerDefinitions")
862 .and_then(|v| v.as_array())
863 .cloned()
864 .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
865 if container_definitions.is_empty() {
866 return Err(client_exception(
867 "Task definition must have at least one container",
868 ));
869 }
870 for cd in &container_definitions {
871 if cd
872 .get("name")
873 .and_then(|v| v.as_str())
874 .unwrap_or("")
875 .is_empty()
876 {
877 return Err(client_exception(
878 "Container definition is missing required field: name",
879 ));
880 }
881 if cd
882 .get("image")
883 .and_then(|v| v.as_str())
884 .unwrap_or("")
885 .is_empty()
886 {
887 return Err(client_exception(
888 "Container definition is missing required field: image",
889 ));
890 }
891 }
892 if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
896 self.check_pass_role(&request.account_id, role_arn)?;
897 }
898 if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
899 self.check_pass_role(&request.account_id, role_arn)?;
900 }
901
902 let tags = parse_tags(&body);
903 let requires_compatibilities: Vec<String> = body
904 .get("requiresCompatibilities")
905 .and_then(|v| v.as_array())
906 .map(|arr| {
907 arr.iter()
908 .filter_map(|v| v.as_str().map(String::from))
909 .collect()
910 })
911 .unwrap_or_default();
912 let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
916
917 let account = request.account_id.clone();
918 let mut accounts = self.state.write();
919 let state = accounts.get_or_create(&account);
920 let revision = state.allocate_revision(&family);
921 let arn = state.task_definition_arn(&family, revision);
922 let td = TaskDefinition {
923 family: family.clone(),
924 revision,
925 task_definition_arn: arn,
926 container_definitions,
927 status: "ACTIVE".to_string(),
928 task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
929 execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
930 network_mode: opt_str(&body, "networkMode").map(String::from),
931 requires_compatibilities,
932 compatibilities,
933 cpu: opt_str(&body, "cpu").map(String::from),
934 memory: opt_str(&body, "memory").map(String::from),
935 pid_mode: opt_str(&body, "pidMode").map(String::from),
936 ipc_mode: opt_str(&body, "ipcMode").map(String::from),
937 volumes: body
938 .get("volumes")
939 .and_then(|v| v.as_array())
940 .cloned()
941 .unwrap_or_default(),
942 placement_constraints: body
943 .get("placementConstraints")
944 .and_then(|v| v.as_array())
945 .cloned()
946 .unwrap_or_default(),
947 proxy_configuration: body.get("proxyConfiguration").cloned(),
948 inference_accelerators: body
949 .get("inferenceAccelerators")
950 .and_then(|v| v.as_array())
951 .cloned()
952 .unwrap_or_default(),
953 ephemeral_storage: body.get("ephemeralStorage").cloned(),
954 runtime_platform: body.get("runtimePlatform").cloned(),
955 requires_attributes: Vec::new(),
956 registered_at: Utc::now(),
957 registered_by: request
958 .principal
959 .as_ref()
960 .map(|p| p.arn.clone())
961 .or(Some(format!("arn:aws:iam::{}:root", state.account_id))),
962 deregistered_at: None,
963 tags: tags.clone(),
964 enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
965 };
966 let td_json = task_definition_to_json(&td);
967 state
968 .task_definitions
969 .entry(family.clone())
970 .or_default()
971 .insert(revision, td);
972
973 Ok(AwsResponse::ok_json(json!({
974 "taskDefinition": td_json,
975 "tags": tags_json(&tags),
976 })))
977 }
978
979 fn describe_task_definition(
980 &self,
981 request: &AwsRequest,
982 ) -> Result<AwsResponse, AwsServiceError> {
983 let body = request.json_body();
984 let td_ref = req_str(&body, "taskDefinition")?;
985 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
986 let include_tags = body
987 .get("include")
988 .and_then(|v| v.as_array())
989 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
990 .unwrap_or(false);
991
992 let account = target_account_for_task_definition(request, td_ref);
993 let accounts = self.state.read();
994 let state = accounts
995 .get(&account)
996 .ok_or_else(|| task_definition_not_found(td_ref))?;
997 let revisions = state
998 .task_definitions
999 .get(&family)
1000 .ok_or_else(|| task_definition_not_found(td_ref))?;
1001 let td = match rev {
1002 Some(n) => revisions
1003 .get(&n)
1004 .ok_or_else(|| task_definition_not_found(td_ref))?,
1005 None => latest_active_revision(revisions)
1006 .ok_or_else(|| task_definition_not_found(td_ref))?,
1007 };
1008 let mut out = json!({"taskDefinition": task_definition_to_json(td)});
1009 if include_tags {
1010 out.as_object_mut()
1011 .unwrap()
1012 .insert("tags".into(), tags_json(&td.tags));
1013 }
1014 Ok(AwsResponse::ok_json(out))
1015 }
1016
1017 fn deregister_task_definition(
1018 &self,
1019 request: &AwsRequest,
1020 ) -> Result<AwsResponse, AwsServiceError> {
1021 let body = request.json_body();
1022 let td_ref = req_str(&body, "taskDefinition")?;
1023 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1024 let rev =
1025 rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
1026
1027 let account = target_account_for_task_definition(request, td_ref);
1028 let mut accounts = self.state.write();
1029 let state = accounts.get_or_create(&account);
1030 let revisions = state
1031 .task_definitions
1032 .get_mut(&family)
1033 .ok_or_else(|| task_definition_not_found(td_ref))?;
1034 let td = revisions
1035 .get_mut(&rev)
1036 .ok_or_else(|| task_definition_not_found(td_ref))?;
1037 td.status = "INACTIVE".to_string();
1038 td.deregistered_at = Some(Utc::now());
1039 let snapshot = td.clone();
1040 Ok(AwsResponse::ok_json(json!({
1041 "taskDefinition": task_definition_to_json(&snapshot),
1042 })))
1043 }
1044
1045 fn delete_task_definitions(
1046 &self,
1047 request: &AwsRequest,
1048 ) -> Result<AwsResponse, AwsServiceError> {
1049 let body = request.json_body();
1050 let refs: Vec<String> = body
1051 .get("taskDefinitions")
1052 .and_then(|v| v.as_array())
1053 .map(|arr| {
1054 arr.iter()
1055 .filter_map(|v| v.as_str().map(String::from))
1056 .collect()
1057 })
1058 .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1059 if refs.is_empty() {
1060 return Err(client_exception("taskDefinitions must not be empty"));
1061 }
1062
1063 let mut deleted = Vec::new();
1064 let mut failures = Vec::new();
1065 let account = request.account_id.clone();
1066 let mut accounts = self.state.write();
1067 let state = accounts.get_or_create(&account);
1068 for input in refs {
1069 let parsed = match resolve_task_definition_ref(&input) {
1070 Ok((_, family, Some(rev))) => Some((family, rev)),
1071 Ok(_) => None,
1072 Err(_) => None,
1073 };
1074 let Some((family, rev)) = parsed else {
1075 failures.push(json!({
1076 "arn": input,
1077 "reason": "INVALID_REFERENCE",
1078 "detail": "Expected family:revision or full task-definition ARN",
1079 }));
1080 continue;
1081 };
1082 let Some(revisions) = state.task_definitions.get_mut(&family) else {
1083 failures.push(json!({"arn": input, "reason": "MISSING"}));
1084 continue;
1085 };
1086 let Some(td) = revisions.get_mut(&rev) else {
1087 failures.push(json!({"arn": input, "reason": "MISSING"}));
1088 continue;
1089 };
1090 if td.status == "ACTIVE" {
1091 failures.push(json!({
1092 "arn": td.task_definition_arn.clone(),
1093 "reason": "MUST_BE_INACTIVE",
1094 "detail": "Task definitions must be deregistered before they can be deleted",
1095 }));
1096 continue;
1097 }
1098 td.status = "DELETE_IN_PROGRESS".to_string();
1099 deleted.push(task_definition_to_json(td));
1100 }
1101 Ok(AwsResponse::ok_json(json!({
1102 "taskDefinitions": deleted,
1103 "failures": failures,
1104 })))
1105 }
1106
1107 fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1108 let body = request.json_body();
1109 let family_prefix = opt_str(&body, "familyPrefix");
1110 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1111 let sort = opt_str(&body, "sort").unwrap_or("ASC");
1112 let max_results = body
1113 .get("maxResults")
1114 .and_then(|v| v.as_i64())
1115 .filter(|n| (1..=100).contains(n))
1116 .map(|n| n as usize)
1117 .unwrap_or(100);
1118 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1119
1120 let account = request.account_id.clone();
1121 let accounts = self.state.read();
1122 let mut arns: Vec<String> = Vec::new();
1123 if let Some(state) = accounts.get(&account) {
1124 for (family, revisions) in &state.task_definitions {
1125 if let Some(prefix) = family_prefix {
1126 if !family.starts_with(prefix) {
1127 continue;
1128 }
1129 }
1130 for td in revisions.values() {
1131 if td.status == status {
1132 arns.push(td.task_definition_arn.clone());
1133 }
1134 }
1135 }
1136 }
1137 if sort == "DESC" {
1138 arns.sort();
1139 arns.reverse();
1140 } else {
1141 arns.sort();
1142 }
1143 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1144 let end = (start + max_results).min(arns.len());
1145 let page = arns[start..end].to_vec();
1146 let mut out = json!({"taskDefinitionArns": page});
1147 if end < arns.len() {
1148 out.as_object_mut()
1149 .unwrap()
1150 .insert("nextToken".into(), json!(end.to_string()));
1151 }
1152 Ok(AwsResponse::ok_json(out))
1153 }
1154
1155 fn list_task_definition_families(
1156 &self,
1157 request: &AwsRequest,
1158 ) -> Result<AwsResponse, AwsServiceError> {
1159 let body = request.json_body();
1160 let family_prefix = opt_str(&body, "familyPrefix");
1161 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1162 let max_results = body
1163 .get("maxResults")
1164 .and_then(|v| v.as_i64())
1165 .filter(|n| (1..=100).contains(n))
1166 .map(|n| n as usize)
1167 .unwrap_or(100);
1168 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1169
1170 let account = request.account_id.clone();
1171 let accounts = self.state.read();
1172 let mut families: Vec<String> = Vec::new();
1173 if let Some(state) = accounts.get(&account) {
1174 for (family, revisions) in &state.task_definitions {
1175 if let Some(prefix) = family_prefix {
1176 if !family.starts_with(prefix) {
1177 continue;
1178 }
1179 }
1180 let matches_status = match status {
1181 "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1182 "INACTIVE" => revisions
1183 .values()
1184 .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1185 "ALL" => true,
1186 _ => revisions.values().any(|td| td.status == status),
1187 };
1188 if matches_status {
1189 families.push(family.clone());
1190 }
1191 }
1192 }
1193 families.sort();
1194 let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1195 let end = (start + max_results).min(families.len());
1196 let page = families[start..end].to_vec();
1197 let mut out = json!({"families": page});
1198 if end < families.len() {
1199 out.as_object_mut()
1200 .unwrap()
1201 .insert("nextToken".into(), json!(end.to_string()));
1202 }
1203 Ok(AwsResponse::ok_json(out))
1204 }
1205}
1206
1207fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1208 if family.is_empty() || family.len() > 255 {
1209 return Err(invalid_parameter(
1210 "Task definition family must be 1-255 characters",
1211 ));
1212 }
1213 let ok = family
1214 .chars()
1215 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1216 if !ok {
1217 return Err(invalid_parameter(
1218 "Task definition family may only contain letters, numbers, hyphens, and underscores",
1219 ));
1220 }
1221 Ok(())
1222}
1223
1224impl EcsService {
1227 fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1228 let body = request.json_body();
1229 let arn = req_str(&body, "resourceArn")?.to_string();
1230 let tags = parse_tags(&body);
1231 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1232 let mut accounts = self.state.write();
1233 let state = accounts.get_or_create(&account);
1234 match resource_type.as_str() {
1235 "cluster" => {
1236 let cluster = state
1237 .clusters
1238 .get_mut(&tail)
1239 .ok_or_else(|| resource_not_found(&arn))?;
1240 merge_tags(&mut cluster.tags, tags);
1241 }
1242 "task-definition" => {
1243 let (family, rev) = parse_family_revision(&tail);
1244 let rev = rev.ok_or_else(|| {
1245 invalid_parameter("task-definition ARN must include revision")
1246 })?;
1247 let td = state
1248 .task_definitions
1249 .get_mut(&family)
1250 .and_then(|m| m.get_mut(&rev))
1251 .ok_or_else(|| resource_not_found(&arn))?;
1252 merge_tags(&mut td.tags, tags);
1253 }
1254 "service" => {
1255 let key =
1256 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1257 let svc = state.services.get_mut(&key).expect("resolved key exists");
1258 merge_tags(&mut svc.tags, tags);
1259 }
1260 "task" => {
1261 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1262 let task = state
1263 .tasks
1264 .get_mut(&task_id)
1265 .ok_or_else(|| resource_not_found(&arn))?;
1266 merge_tags(&mut task.tags, tags);
1267 }
1268 "task-set" => {
1269 let ts = state
1270 .task_sets
1271 .get_mut(&tail)
1272 .ok_or_else(|| resource_not_found(&arn))?;
1273 merge_tags(&mut ts.tags, tags);
1274 }
1275 "container-instance" => {
1276 let key = resolve_container_instance_key(state, &tail)
1277 .ok_or_else(|| resource_not_found(&arn))?;
1278 let ci = state
1279 .container_instances
1280 .get_mut(&key)
1281 .expect("resolved key exists");
1282 merge_tags(&mut ci.tags, tags);
1283 }
1284 "capacity-provider" => {
1285 let cp = state
1286 .capacity_providers
1287 .get_mut(&tail)
1288 .ok_or_else(|| resource_not_found(&arn))?;
1289 merge_tags(&mut cp.tags, tags);
1290 }
1291 other => {
1292 return Err(invalid_parameter(format!(
1293 "Unknown ECS resource type: {other}"
1294 )));
1295 }
1296 }
1297 Ok(AwsResponse::ok_json(json!({})))
1298 }
1299
1300 fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1301 let body = request.json_body();
1302 let arn = req_str(&body, "resourceArn")?.to_string();
1303 let keys: Vec<String> = body
1304 .get("tagKeys")
1305 .and_then(|v| v.as_array())
1306 .map(|arr| {
1307 arr.iter()
1308 .filter_map(|v| v.as_str().map(String::from))
1309 .collect()
1310 })
1311 .unwrap_or_default();
1312 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1313 let mut accounts = self.state.write();
1314 let state = accounts.get_or_create(&account);
1315 match resource_type.as_str() {
1316 "cluster" => {
1317 let cluster = state
1318 .clusters
1319 .get_mut(&tail)
1320 .ok_or_else(|| resource_not_found(&arn))?;
1321 cluster.tags.retain(|t| !keys.contains(&t.key));
1322 }
1323 "task-definition" => {
1324 let (family, rev) = parse_family_revision(&tail);
1325 let rev = rev.ok_or_else(|| {
1326 invalid_parameter("task-definition ARN must include revision")
1327 })?;
1328 let td = state
1329 .task_definitions
1330 .get_mut(&family)
1331 .and_then(|m| m.get_mut(&rev))
1332 .ok_or_else(|| resource_not_found(&arn))?;
1333 td.tags.retain(|t| !keys.contains(&t.key));
1334 }
1335 "service" => {
1336 let key =
1337 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1338 let svc = state.services.get_mut(&key).expect("resolved key exists");
1339 svc.tags.retain(|t| !keys.contains(&t.key));
1340 }
1341 "task" => {
1342 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1343 let task = state
1344 .tasks
1345 .get_mut(&task_id)
1346 .ok_or_else(|| resource_not_found(&arn))?;
1347 task.tags.retain(|t| !keys.contains(&t.key));
1348 }
1349 "task-set" => {
1350 let ts = state
1351 .task_sets
1352 .get_mut(&tail)
1353 .ok_or_else(|| resource_not_found(&arn))?;
1354 ts.tags.retain(|t| !keys.contains(&t.key));
1355 }
1356 "container-instance" => {
1357 let key = resolve_container_instance_key(state, &tail)
1358 .ok_or_else(|| resource_not_found(&arn))?;
1359 let ci = state
1360 .container_instances
1361 .get_mut(&key)
1362 .expect("resolved key exists");
1363 ci.tags.retain(|t| !keys.contains(&t.key));
1364 }
1365 "capacity-provider" => {
1366 let cp = state
1367 .capacity_providers
1368 .get_mut(&tail)
1369 .ok_or_else(|| resource_not_found(&arn))?;
1370 cp.tags.retain(|t| !keys.contains(&t.key));
1371 }
1372 other => {
1373 return Err(invalid_parameter(format!(
1374 "Unknown ECS resource type: {other}"
1375 )));
1376 }
1377 }
1378 Ok(AwsResponse::ok_json(json!({})))
1379 }
1380
1381 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1382 let body = request.json_body();
1383 let arn = req_str(&body, "resourceArn")?.to_string();
1384 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1385 let accounts = self.state.read();
1386 let state = accounts
1387 .get(&account)
1388 .ok_or_else(|| resource_not_found(&arn))?;
1389 let tags = match resource_type.as_str() {
1390 "cluster" => state
1391 .clusters
1392 .get(&tail)
1393 .map(|c| c.tags.clone())
1394 .ok_or_else(|| resource_not_found(&arn))?,
1395 "task-definition" => {
1396 let (family, rev) = parse_family_revision(&tail);
1397 let rev = rev.ok_or_else(|| {
1398 invalid_parameter("task-definition ARN must include revision")
1399 })?;
1400 state
1401 .task_definitions
1402 .get(&family)
1403 .and_then(|m| m.get(&rev))
1404 .map(|td| td.tags.clone())
1405 .ok_or_else(|| resource_not_found(&arn))?
1406 }
1407 "service" => {
1408 let key =
1409 resolve_service_key(state, &tail).ok_or_else(|| resource_not_found(&arn))?;
1410 state
1411 .services
1412 .get(&key)
1413 .map(|s| s.tags.clone())
1414 .expect("resolved key exists")
1415 }
1416 "task" => {
1417 let task_id = tail.rsplit('/').next().unwrap_or(&tail).to_string();
1418 state
1419 .tasks
1420 .get(&task_id)
1421 .map(|t| t.tags.clone())
1422 .ok_or_else(|| resource_not_found(&arn))?
1423 }
1424 "task-set" => state
1425 .task_sets
1426 .get(&tail)
1427 .map(|t| t.tags.clone())
1428 .ok_or_else(|| resource_not_found(&arn))?,
1429 "container-instance" => {
1430 let key = resolve_container_instance_key(state, &tail)
1431 .ok_or_else(|| resource_not_found(&arn))?;
1432 state
1433 .container_instances
1434 .get(&key)
1435 .map(|c| c.tags.clone())
1436 .expect("resolved key exists")
1437 }
1438 "capacity-provider" => state
1439 .capacity_providers
1440 .get(&tail)
1441 .map(|c| c.tags.clone())
1442 .ok_or_else(|| resource_not_found(&arn))?,
1443 other => {
1444 return Err(invalid_parameter(format!(
1445 "Unknown ECS resource type: {other}"
1446 )));
1447 }
1448 };
1449 Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1450 }
1451}
1452
1453fn resource_not_found(arn: &str) -> AwsServiceError {
1454 AwsServiceError::aws_error(
1455 StatusCode::BAD_REQUEST,
1456 "ClientException",
1457 format!("The referenced resource was not found: {arn}"),
1458 )
1459}
1460
1461impl EcsService {
1464 fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1465 let body = request.json_body();
1466 let name = req_str(&body, "name")?.to_string();
1467 let value = req_str(&body, "value")?.to_string();
1468 let principal_arn = opt_str(&body, "principalArn")
1469 .map(String::from)
1470 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1471 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1472 let account = request.account_id.clone();
1473 let mut accounts = self.state.write();
1474 let state = accounts.get_or_create(&account);
1475 state
1476 .principal_account_settings
1477 .entry(principal_arn.clone())
1478 .or_default()
1479 .insert(name.clone(), value.clone());
1480 Ok(AwsResponse::ok_json(json!({
1481 "setting": {
1482 "name": name,
1483 "value": value,
1484 "principalArn": principal_arn,
1485 }
1486 })))
1487 }
1488
1489 fn put_account_setting_default(
1490 &self,
1491 request: &AwsRequest,
1492 ) -> Result<AwsResponse, AwsServiceError> {
1493 let body = request.json_body();
1494 let name = req_str(&body, "name")?.to_string();
1495 let value = req_str(&body, "value")?.to_string();
1496 let account = request.account_id.clone();
1497 let mut accounts = self.state.write();
1498 let state = accounts.get_or_create(&account);
1499 state
1500 .account_setting_defaults
1501 .insert(name.clone(), value.clone());
1502 Ok(AwsResponse::ok_json(json!({
1503 "setting": {
1504 "name": name,
1505 "value": value,
1506 "principalArn": format!("arn:aws:iam::{}:root", state.account_id),
1507 }
1508 })))
1509 }
1510
1511 fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1512 let body = request.json_body();
1513 let name = req_str(&body, "name")?.to_string();
1514 let principal_arn = opt_str(&body, "principalArn")
1515 .map(String::from)
1516 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1517 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1518 let account = request.account_id.clone();
1519 let mut accounts = self.state.write();
1520 let state = accounts.get_or_create(&account);
1521 let removed_value = state
1522 .principal_account_settings
1523 .get_mut(&principal_arn)
1524 .and_then(|m| m.remove(&name));
1525 Ok(AwsResponse::ok_json(json!({
1526 "setting": {
1527 "name": name,
1528 "value": removed_value.unwrap_or_default(),
1529 "principalArn": principal_arn,
1530 }
1531 })))
1532 }
1533
1534 fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1535 let body = request.json_body();
1536 let name_filter = opt_str(&body, "name");
1537 let value_filter = opt_str(&body, "value");
1538 let principal_filter = opt_str(&body, "principalArn");
1539 let effective_only = body
1540 .get("effectiveSettings")
1541 .and_then(|v| v.as_bool())
1542 .unwrap_or(false);
1543
1544 let account = request.account_id.clone();
1545 let accounts = self.state.read();
1546 let Some(state) = accounts.get(&account) else {
1547 return Ok(AwsResponse::ok_json(json!({"settings": []})));
1548 };
1549 let root_arn = format!("arn:aws:iam::{}:root", state.account_id);
1550 let mut settings: Vec<Value> = Vec::new();
1551
1552 if effective_only {
1553 let principal = principal_filter
1556 .map(String::from)
1557 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1558 .unwrap_or_else(|| root_arn.clone());
1559 let mut merged = state.account_setting_defaults.clone();
1560 if let Some(overrides) = state.principal_account_settings.get(&principal) {
1561 for (k, v) in overrides {
1562 merged.insert(k.clone(), v.clone());
1563 }
1564 }
1565 for (k, v) in merged {
1566 if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1567 settings.push(json!({
1568 "name": k,
1569 "value": v,
1570 "principalArn": principal,
1571 }));
1572 }
1573 }
1574 } else {
1575 for (k, v) in &state.account_setting_defaults {
1578 if matches_filter(name_filter, k)
1579 && matches_filter(value_filter, v)
1580 && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1581 {
1582 settings.push(json!({
1583 "name": k,
1584 "value": v,
1585 "principalArn": root_arn,
1586 }));
1587 }
1588 }
1589 for (principal, entries) in &state.principal_account_settings {
1590 if principal_filter.is_some_and(|pf| pf != principal) {
1591 continue;
1592 }
1593 for (k, v) in entries {
1594 if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1595 settings.push(json!({
1596 "name": k,
1597 "value": v,
1598 "principalArn": principal,
1599 }));
1600 }
1601 }
1602 }
1603 }
1604
1605 Ok(AwsResponse::ok_json(json!({"settings": settings})))
1606 }
1607}
1608
1609fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1610 filter.is_none_or(|f| f == value)
1611}
1612
1613impl EcsService {
1616 fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1617 let body = request.json_body();
1618 let td_ref = req_str(&body, "taskDefinition")?;
1619 let cluster_ref = opt_str(&body, "cluster");
1620 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1621 let launch_type = opt_str(&body, "launchType")
1622 .unwrap_or("FARGATE")
1623 .to_string();
1624 let count = body
1625 .get("count")
1626 .and_then(|v| v.as_i64())
1627 .filter(|n| (1..=10).contains(n))
1628 .unwrap_or(1) as usize;
1629 let group = opt_str(&body, "group").map(String::from);
1630 let started_by = opt_str(&body, "startedBy").map(String::from);
1631 let tags = parse_tags(&body);
1632
1633 if let Some(overrides) = body.get("overrides") {
1639 if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1640 self.check_pass_role(&request.account_id, role_arn)?;
1641 }
1642 if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1643 self.check_pass_role(&request.account_id, role_arn)?;
1644 }
1645 }
1646
1647 let account = request.account_id.clone();
1648 let runtime = self.runtime.clone();
1649 let mut accounts = self.state.write();
1650 let state = accounts.get_or_create(&account);
1651 let cluster_arn = state
1652 .clusters
1653 .get(&cluster_name)
1654 .map(|c| c.cluster_arn.clone())
1655 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1656 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1657 let revisions = state
1658 .task_definitions
1659 .get(&family)
1660 .ok_or_else(|| task_definition_not_found(td_ref))?;
1661 let td = match rev {
1662 Some(n) => revisions
1663 .get(&n)
1664 .ok_or_else(|| task_definition_not_found(td_ref))?,
1665 None => latest_active_revision(revisions)
1666 .ok_or_else(|| task_definition_not_found(td_ref))?,
1667 };
1668 if td.status != "ACTIVE" {
1669 return Err(client_exception(format!(
1670 "Task definition {} is not ACTIVE",
1671 td.task_definition_arn
1672 )));
1673 }
1674 let td_arn = td.task_definition_arn.clone();
1675 let td_family = td.family.clone();
1676 let td_revision = td.revision;
1677 let td_cpu = td.cpu.clone();
1678 let td_memory = td.memory.clone();
1679 let td_task_role = td.task_role_arn.clone();
1680 let td_exec_role = td.execution_role_arn.clone();
1681 let td_containers = td.container_definitions.clone();
1682
1683 let mut spawned_tasks: Vec<String> = Vec::new();
1684 let mut task_jsons: Vec<Value> = Vec::new();
1685 for _ in 0..count {
1686 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1687 let task_arn = state.task_arn(&cluster_name, &task_id);
1688 let containers: Vec<Container> = td_containers
1689 .iter()
1690 .map(|def| Container {
1691 container_arn: format!(
1692 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1693 state.region,
1694 state.account_id,
1695 cluster_name,
1696 task_id,
1697 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1698 ),
1699 name: def
1700 .get("name")
1701 .and_then(|v| v.as_str())
1702 .unwrap_or("app")
1703 .to_string(),
1704 image: def
1705 .get("image")
1706 .and_then(|v| v.as_str())
1707 .unwrap_or("")
1708 .to_string(),
1709 task_arn: task_arn.clone(),
1710 last_status: "PENDING".into(),
1711 exit_code: None,
1712 reason: None,
1713 runtime_id: None,
1714 essential: def
1715 .get("essential")
1716 .and_then(|v| v.as_bool())
1717 .unwrap_or(true),
1718 cpu: def
1719 .get("cpu")
1720 .and_then(|v| v.as_i64())
1721 .map(|n| n.to_string()),
1722 memory: def
1723 .get("memory")
1724 .and_then(|v| v.as_i64())
1725 .map(|n| n.to_string()),
1726 memory_reservation: def
1727 .get("memoryReservation")
1728 .and_then(|v| v.as_i64())
1729 .map(|n| n.to_string()),
1730 network_bindings: Vec::new(),
1731 network_interfaces: Vec::new(),
1732 health_status: Some("UNKNOWN".to_string()),
1733 managed_agents: None,
1734 })
1735 .collect();
1736 let awslogs = td_containers.iter().find_map(|def| {
1737 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1738 let log_cfg = def.get("logConfiguration")?;
1739 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1740 return None;
1741 }
1742 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1743 Some(AwsLogsConfig {
1744 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1745 stream_prefix: opts
1746 .get("awslogs-stream-prefix")
1747 .and_then(|v| v.as_str())
1748 .map(String::from),
1749 region: opts
1750 .get("awslogs-region")
1751 .and_then(|v| v.as_str())
1752 .unwrap_or(&state.region)
1753 .to_string(),
1754 container_name: name,
1755 })
1756 });
1757 let task = Task {
1758 task_arn: task_arn.clone(),
1759 task_id: task_id.clone(),
1760 cluster_arn: cluster_arn.clone(),
1761 cluster_name: cluster_name.clone(),
1762 task_definition_arn: td_arn.clone(),
1763 family: td_family.clone(),
1764 revision: td_revision,
1765 last_status: "PROVISIONING".into(),
1766 desired_status: "RUNNING".into(),
1767 launch_type: launch_type.clone(),
1768 platform_version: Some("1.4.0".into()),
1769 cpu: body
1770 .get("overrides")
1771 .and_then(|v| v.get("cpu"))
1772 .and_then(|v| v.as_str())
1773 .map(String::from)
1774 .or_else(|| td_cpu.clone()),
1775 memory: body
1776 .get("overrides")
1777 .and_then(|v| v.get("memory"))
1778 .and_then(|v| v.as_str())
1779 .map(String::from)
1780 .or_else(|| td_memory.clone()),
1781 containers,
1782 overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1783 started_by: started_by.clone(),
1784 group: group.clone(),
1785 connectivity: "CONNECTING".into(),
1786 stop_code: None,
1787 stopped_reason: None,
1788 created_at: Utc::now(),
1789 started_at: None,
1790 stopping_at: None,
1791 stopped_at: None,
1792 pull_started_at: None,
1793 pull_stopped_at: None,
1794 connectivity_at: None,
1795 started_by_ref_id: None,
1796 execution_role_arn: td_exec_role.clone(),
1797 task_role_arn: td_task_role.clone(),
1798 tags: tags.clone(),
1799 awslogs,
1800 captured_logs: String::new(),
1801 protection: None,
1802 };
1803 state.tasks.insert(task_id.clone(), task.clone());
1804 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1805 cluster.pending_tasks_count += 1;
1806 }
1807 if let Some(t) = state.tasks.get_mut(&task_id) {
1811 t.last_status = "PENDING".into();
1812 }
1813 task_jsons.push(task_to_json(&task));
1814 spawned_tasks.push(task_id.clone());
1815 }
1816 drop(accounts);
1817
1818 if let Some(rt) = runtime {
1820 for id in &spawned_tasks {
1821 rt.clone()
1822 .run_task(self.state.clone(), id.clone(), account.clone());
1823 }
1824 } else {
1825 let mut accounts = self.state.write();
1830 if let Some(state) = accounts.get_mut(&account) {
1831 let mut cluster_drains: Vec<String> = Vec::new();
1832 for id in &spawned_tasks {
1833 if let Some(t) = state.tasks.get_mut(id) {
1834 t.last_status = "STOPPED".into();
1835 t.desired_status = "STOPPED".into();
1836 t.stop_code = Some("TaskFailedToStart".into());
1837 t.stopped_reason = Some(
1838 "No container runtime available (docker/podman not installed)".into(),
1839 );
1840 t.stopped_at = Some(Utc::now());
1841 for c in t.containers.iter_mut() {
1842 c.last_status = "STOPPED".into();
1843 }
1844 cluster_drains.push(t.cluster_name.clone());
1845 }
1846 }
1847 for name in cluster_drains {
1848 if let Some(cluster) = state.clusters.get_mut(&name) {
1849 if cluster.pending_tasks_count > 0 {
1850 cluster.pending_tasks_count -= 1;
1851 }
1852 }
1853 }
1854 }
1855 }
1856
1857 Ok(AwsResponse::ok_json(json!({
1858 "tasks": task_jsons,
1859 "failures": [],
1860 })))
1861 }
1862
1863 fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1864 self.run_task(request)
1869 }
1870
1871 async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1872 let body = request.json_body();
1873 let task_ref = req_str(&body, "task")?;
1874 let reason = opt_str(&body, "reason")
1875 .unwrap_or("UserInitiated")
1876 .to_string();
1877 let cluster_ref = opt_str(&body, "cluster");
1878 let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1879
1880 let (task_id, account, task_snapshot) = {
1881 let account = request.account_id.clone();
1882 let mut accounts = self.state.write();
1883 let state = accounts
1884 .get_mut(&account)
1885 .ok_or_else(|| task_not_found(task_ref))?;
1886 let task_id = resolve_task_id(state, task_ref)?;
1887 let task = state
1888 .tasks
1889 .get_mut(&task_id)
1890 .ok_or_else(|| task_not_found(task_ref))?;
1891 task.desired_status = "STOPPED".into();
1892 task.stopping_at = Some(Utc::now());
1893 task.stopped_reason = Some(reason.clone());
1894 task.stop_code = Some("UserInitiated".into());
1895 (task_id, account, task.clone())
1896 };
1897 if let Some(rt) = &self.runtime {
1898 rt.stop_task(&task_id, &reason).await;
1899 }
1900 let _ = account;
1901 Ok(AwsResponse::ok_json(json!({
1902 "task": task_to_json(&task_snapshot),
1903 })))
1904 }
1905
1906 fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1907 let body = request.json_body();
1908 let refs: Vec<String> = body
1909 .get("tasks")
1910 .and_then(|v| v.as_array())
1911 .map(|arr| {
1912 arr.iter()
1913 .filter_map(|v| v.as_str().map(String::from))
1914 .collect()
1915 })
1916 .unwrap_or_default();
1917 let include_tags = body
1918 .get("include")
1919 .and_then(|v| v.as_array())
1920 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1921 .unwrap_or(false);
1922
1923 let account = request.account_id.clone();
1924 let accounts = self.state.read();
1925 let Some(state) = accounts.get(&account) else {
1926 return Ok(AwsResponse::ok_json(json!({
1927 "tasks": [],
1928 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1929 })));
1930 };
1931 let mut found = Vec::new();
1932 let mut failures = Vec::new();
1933 for input in &refs {
1934 let task_id = task_id_from_ref(input);
1935 match state.tasks.get(&task_id) {
1936 Some(t) => {
1937 let mut v = task_to_json(t);
1938 if include_tags {
1939 v.as_object_mut()
1940 .unwrap()
1941 .insert("tags".into(), tags_json(&t.tags));
1942 }
1943 found.push(v);
1944 }
1945 None => {
1946 failures.push(json!({
1947 "arn": input,
1948 "reason": "MISSING",
1949 }));
1950 }
1951 }
1952 }
1953 Ok(AwsResponse::ok_json(json!({
1954 "tasks": found,
1955 "failures": failures,
1956 })))
1957 }
1958
1959 fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1960 let body = request.json_body();
1961 let cluster_ref = opt_str(&body, "cluster");
1962 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1963 let family = opt_str(&body, "family");
1964 let status_filter = opt_str(&body, "desiredStatus");
1965 let started_by = opt_str(&body, "startedBy");
1966 let max_results = body
1967 .get("maxResults")
1968 .and_then(|v| v.as_i64())
1969 .filter(|n| (1..=100).contains(n))
1970 .map(|n| n as usize)
1971 .unwrap_or(100);
1972 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1973
1974 let account = request.account_id.clone();
1975 let accounts = self.state.read();
1976 let mut arns: Vec<String> = match accounts.get(&account) {
1977 Some(state) => state
1978 .tasks
1979 .values()
1980 .filter(|t| t.cluster_name == cluster_name)
1981 .filter(|t| family.is_none_or(|f| t.family == f))
1982 .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1983 .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1984 .map(|t| t.task_arn.clone())
1985 .collect(),
1986 None => Vec::new(),
1987 };
1988 arns.sort();
1989 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1990 let end = (start + max_results).min(arns.len());
1991 let page = arns[start..end].to_vec();
1992 let mut out = json!({"taskArns": page});
1993 if end < arns.len() {
1994 out.as_object_mut()
1995 .unwrap()
1996 .insert("nextToken".into(), json!(end.to_string()));
1997 }
1998 Ok(AwsResponse::ok_json(out))
1999 }
2000}
2001
2002fn task_not_found(task_ref: &str) -> AwsServiceError {
2003 AwsServiceError::aws_error(
2004 StatusCode::BAD_REQUEST,
2005 "ClientException",
2006 format!("Task not found: {task_ref}"),
2007 )
2008}
2009
2010fn task_id_from_ref(input: &str) -> String {
2012 if let Some(rest) = input.rsplit('/').next() {
2013 return rest.to_string();
2014 }
2015 input.to_string()
2016}
2017
2018fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
2019 let id = task_id_from_ref(task_ref);
2020 if state.tasks.contains_key(&id) {
2021 Ok(id)
2022 } else {
2023 Err(task_not_found(task_ref))
2024 }
2025}
2026
2027fn task_to_json(task: &Task) -> Value {
2028 let mut map = serde_json::Map::new();
2029 map.insert("taskArn".into(), json!(task.task_arn));
2030 map.insert("clusterArn".into(), json!(task.cluster_arn));
2031 map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
2032 map.insert("lastStatus".into(), json!(task.last_status));
2033 map.insert("desiredStatus".into(), json!(task.desired_status));
2034 map.insert("launchType".into(), json!(task.launch_type));
2035 if let Some(ref v) = task.platform_version {
2036 map.insert("platformVersion".into(), json!(v));
2037 }
2038 if let Some(ref v) = task.cpu {
2039 map.insert("cpu".into(), json!(v));
2040 }
2041 if let Some(ref v) = task.memory {
2042 map.insert("memory".into(), json!(v));
2043 }
2044 map.insert(
2045 "containers".into(),
2046 Value::Array(task.containers.iter().map(container_to_json).collect()),
2047 );
2048 map.insert("overrides".into(), task.overrides.clone());
2049 if let Some(ref v) = task.started_by {
2050 map.insert("startedBy".into(), json!(v));
2051 }
2052 if let Some(ref v) = task.group {
2053 map.insert("group".into(), json!(v));
2054 }
2055 map.insert("connectivity".into(), json!(task.connectivity));
2056 if let Some(ref v) = task.stop_code {
2057 map.insert("stopCode".into(), json!(v));
2058 }
2059 if let Some(ref v) = task.stopped_reason {
2060 map.insert("stoppedReason".into(), json!(v));
2061 }
2062 if let Some(ref v) = task.task_role_arn {
2063 map.insert("taskRoleArn".into(), json!(v));
2064 }
2065 if let Some(ref v) = task.execution_role_arn {
2066 map.insert("executionRoleArn".into(), json!(v));
2067 }
2068 map.insert("createdAt".into(), json!(task.created_at.timestamp()));
2069 if let Some(ts) = task.started_at {
2070 map.insert("startedAt".into(), json!(ts.timestamp()));
2071 }
2072 if let Some(ts) = task.stopping_at {
2073 map.insert("stoppingAt".into(), json!(ts.timestamp()));
2074 }
2075 if let Some(ts) = task.stopped_at {
2076 map.insert("stoppedAt".into(), json!(ts.timestamp()));
2077 }
2078 if let Some(ts) = task.pull_started_at {
2079 map.insert("pullStartedAt".into(), json!(ts.timestamp()));
2080 }
2081 if let Some(ts) = task.pull_stopped_at {
2082 map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
2083 }
2084 if let Some(ts) = task.connectivity_at {
2085 map.insert("connectivityAt".into(), json!(ts.timestamp()));
2086 }
2087 Value::Object(map)
2088}
2089
2090fn container_to_json(container: &Container) -> Value {
2091 let mut map = serde_json::Map::new();
2092 map.insert("containerArn".into(), json!(container.container_arn));
2093 map.insert("taskArn".into(), json!(container.task_arn));
2094 map.insert("name".into(), json!(container.name));
2095 map.insert("image".into(), json!(container.image));
2096 map.insert("lastStatus".into(), json!(container.last_status));
2097 map.insert("essential".into(), json!(container.essential));
2098 if let Some(code) = container.exit_code {
2099 map.insert("exitCode".into(), json!(code));
2100 }
2101 if let Some(ref r) = container.reason {
2102 map.insert("reason".into(), json!(r));
2103 }
2104 if let Some(ref id) = container.runtime_id {
2105 map.insert("runtimeId".into(), json!(id));
2106 }
2107 if let Some(ref v) = container.cpu {
2108 map.insert("cpu".into(), json!(v));
2109 }
2110 if let Some(ref v) = container.memory {
2111 map.insert("memory".into(), json!(v));
2112 }
2113 if let Some(ref v) = container.memory_reservation {
2114 map.insert("memoryReservation".into(), json!(v));
2115 }
2116 map.insert("networkBindings".into(), json!(container.network_bindings));
2117 map.insert(
2118 "networkInterfaces".into(),
2119 json!(container.network_interfaces),
2120 );
2121 if let Some(ref v) = container.health_status {
2122 map.insert("healthStatus".into(), json!(v));
2123 }
2124 Value::Object(map)
2125}
2126
2127impl EcsService {
2130 fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2131 let body = request.json_body();
2132 let service_name = req_str(&body, "serviceName")?.to_string();
2133 validate_service_name(&service_name)?;
2134 let td_ref = req_str(&body, "taskDefinition")?;
2135 let cluster_ref = opt_str(&body, "cluster");
2136 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2137 let desired_count = body
2138 .get("desiredCount")
2139 .and_then(|v| v.as_i64())
2140 .filter(|n| *n >= 0)
2141 .unwrap_or(1) as i32;
2142 let launch_type = opt_str(&body, "launchType")
2143 .unwrap_or("FARGATE")
2144 .to_string();
2145 let scheduling = opt_str(&body, "schedulingStrategy")
2146 .unwrap_or("REPLICA")
2147 .to_string();
2148 let deployment_controller = body
2149 .get("deploymentController")
2150 .and_then(|v| v.get("type"))
2151 .and_then(|v| v.as_str())
2152 .unwrap_or("ECS")
2153 .to_string();
2154 let deployment_config = body.get("deploymentConfiguration");
2155 let min_healthy = deployment_config
2156 .and_then(|d| d.get("minimumHealthyPercent"))
2157 .and_then(|v| v.as_i64())
2158 .map(|n| n as i32);
2159 let max_percent = deployment_config
2160 .and_then(|d| d.get("maximumPercent"))
2161 .and_then(|v| v.as_i64())
2162 .map(|n| n as i32);
2163 let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2164 let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2165 enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2166 rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2167 });
2168 let tags = parse_tags(&body);
2169 let role_arn = opt_str(&body, "role").map(String::from);
2170 let load_balancers: Vec<Value> = body
2171 .get("loadBalancers")
2172 .and_then(|v| v.as_array())
2173 .cloned()
2174 .unwrap_or_default();
2175 let service_registries: Vec<Value> = body
2176 .get("serviceRegistries")
2177 .and_then(|v| v.as_array())
2178 .cloned()
2179 .unwrap_or_default();
2180 let placement_constraints: Vec<Value> = body
2181 .get("placementConstraints")
2182 .and_then(|v| v.as_array())
2183 .cloned()
2184 .unwrap_or_default();
2185 let placement_strategy: Vec<Value> = body
2186 .get("placementStrategy")
2187 .and_then(|v| v.as_array())
2188 .cloned()
2189 .unwrap_or_default();
2190 let network_configuration = body.get("networkConfiguration").cloned();
2191
2192 let runtime = self.runtime.clone();
2193 let account = request.account_id.clone();
2194 let principal_arn = request
2195 .principal
2196 .as_ref()
2197 .map(|p| p.arn.clone())
2198 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2199
2200 let (service_json, spawn_task_ids) = {
2201 let mut accounts = self.state.write();
2202 let state = accounts.get_or_create(&account);
2203 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2205 let revisions = state
2206 .task_definitions
2207 .get(&family)
2208 .ok_or_else(|| task_definition_not_found(td_ref))?;
2209 let td = match rev {
2210 Some(n) => revisions
2211 .get(&n)
2212 .ok_or_else(|| task_definition_not_found(td_ref))?,
2213 None => latest_active_revision(revisions)
2214 .ok_or_else(|| task_definition_not_found(td_ref))?,
2215 };
2216 let td_arn = td.task_definition_arn.clone();
2217 let td_family = td.family.clone();
2218 let td_revision = td.revision;
2219 let cluster_arn = state
2220 .clusters
2221 .get(&cluster_name)
2222 .map(|c| c.cluster_arn.clone())
2223 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2224 let service_arn = state.service_arn(&cluster_name, &service_name);
2225 let key = EcsState::service_key(&cluster_name, &service_name);
2226 if let Some(existing) = state.services.get(&key) {
2227 if existing.status != "INACTIVE" {
2228 return Err(service_already_exists(&service_name));
2229 }
2230 }
2231 let deployment = Deployment {
2232 deployment_id: format!(
2233 "ecs-svc/{}",
2234 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2235 ),
2236 status: "PRIMARY".into(),
2237 task_definition_arn: td_arn.clone(),
2238 desired_count,
2239 pending_count: 0,
2240 running_count: 0,
2241 failed_tasks: 0,
2242 created_at: Utc::now(),
2243 updated_at: Utc::now(),
2244 launch_type: launch_type.clone(),
2245 rollout_state: "IN_PROGRESS".into(),
2246 rollout_state_reason: Some("ECS deployment in progress.".into()),
2247 };
2248 let service = Service {
2249 service_name: service_name.clone(),
2250 service_arn: service_arn.clone(),
2251 cluster_name: cluster_name.clone(),
2252 cluster_arn: cluster_arn.clone(),
2253 task_definition_arn: td_arn,
2254 family: td_family,
2255 revision: td_revision,
2256 desired_count,
2257 running_count: 0,
2258 pending_count: 0,
2259 launch_type: launch_type.clone(),
2260 status: "ACTIVE".into(),
2261 scheduling_strategy: scheduling,
2262 deployment_controller,
2263 minimum_healthy_percent: min_healthy,
2264 maximum_percent: max_percent,
2265 circuit_breaker,
2266 deployments: vec![deployment],
2267 load_balancers,
2268 service_registries,
2269 placement_constraints,
2270 placement_strategy,
2271 network_configuration,
2272 tags: tags.clone(),
2273 created_at: Utc::now(),
2274 created_by: Some(principal_arn.clone()),
2275 role_arn,
2276 };
2277 state.services.insert(key.clone(), service.clone());
2278 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2279 cluster.active_services_count += 1;
2280 }
2281 state.push_event(crate::state::LifecycleEvent {
2282 at: Utc::now(),
2283 event_type: "ServiceCreated".into(),
2284 task_arn: None,
2285 cluster_arn: Some(cluster_arn),
2286 last_status: Some("ACTIVE".into()),
2287 detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2288 });
2289 let ids =
2290 spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2291 (service_to_json(state.services.get(&key).unwrap()), ids)
2292 };
2293
2294 if let Some(rt) = runtime {
2295 for id in spawn_task_ids {
2296 rt.clone().run_task(self.state.clone(), id, account.clone());
2297 }
2298 }
2299
2300 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2301 }
2302
2303 fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2304 let body = request.json_body();
2305 let service_ref = req_str(&body, "service")?;
2306 let service_name = service_name_from_ref(service_ref);
2307 let cluster_ref = opt_str(&body, "cluster");
2308 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2309 let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2310 let new_td_ref = opt_str(&body, "taskDefinition");
2311 let account = request.account_id.clone();
2312 let principal_arn = request
2313 .principal
2314 .as_ref()
2315 .map(|p| p.arn.clone())
2316 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2317 let runtime = self.runtime.clone();
2318
2319 let (service_json, spawn_ids, stop_ids) = {
2320 let mut accounts = self.state.write();
2321 let state = accounts
2322 .get_mut(&account)
2323 .ok_or_else(|| service_not_found(&service_name))?;
2324 let key = EcsState::service_key(&cluster_name, &service_name);
2325 if !state.services.contains_key(&key) {
2326 return Err(service_not_found(&service_name));
2327 }
2328
2329 let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2331 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2332 let revisions = state
2333 .task_definitions
2334 .get(&family)
2335 .ok_or_else(|| task_definition_not_found(td_ref))?;
2336 let td = match rev {
2337 Some(n) => revisions
2338 .get(&n)
2339 .ok_or_else(|| task_definition_not_found(td_ref))?,
2340 None => latest_active_revision(revisions)
2341 .ok_or_else(|| task_definition_not_found(td_ref))?,
2342 };
2343 (
2344 Some(td.task_definition_arn.clone()),
2345 td.family.clone(),
2346 td.revision,
2347 )
2348 } else {
2349 let svc = state.services.get(&key).unwrap();
2350 (None, svc.family.clone(), svc.revision)
2351 };
2352
2353 let service_cluster_arn;
2354 let launch_type_clone;
2355 let effective_desired;
2356 let old_desired;
2357 let mut old_deployments_drained: Vec<String> = Vec::new();
2358 let mut new_deployment_triggered = false;
2359
2360 {
2361 let svc = state.services.get_mut(&key).unwrap();
2362 old_desired = svc.desired_count;
2363 service_cluster_arn = svc.cluster_arn.clone();
2364 launch_type_clone = svc.launch_type.clone();
2365
2366 if let Some(n) = new_desired {
2367 let n = n.max(0) as i32;
2368 svc.desired_count = n;
2369 if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2370 d.desired_count = n;
2371 d.updated_at = Utc::now();
2372 }
2373 }
2374
2375 if let Some(arn) = new_td_arn.clone() {
2376 for d in svc.deployments.iter_mut() {
2379 if d.status == "PRIMARY" {
2380 d.status = "ACTIVE".into();
2381 old_deployments_drained.push(d.deployment_id.clone());
2382 }
2383 }
2384 svc.deployments.insert(
2385 0,
2386 Deployment {
2387 deployment_id: format!(
2388 "ecs-svc/{}",
2389 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2390 ),
2391 status: "PRIMARY".into(),
2392 task_definition_arn: arn.clone(),
2393 desired_count: svc.desired_count,
2394 pending_count: 0,
2395 running_count: 0,
2396 failed_tasks: 0,
2397 created_at: Utc::now(),
2398 updated_at: Utc::now(),
2399 launch_type: svc.launch_type.clone(),
2400 rollout_state: "IN_PROGRESS".into(),
2401 rollout_state_reason: Some("ECS deployment in progress.".into()),
2402 },
2403 );
2404 svc.task_definition_arn = arn;
2405 svc.family = new_family;
2406 svc.revision = new_revision;
2407 new_deployment_triggered = true;
2408 }
2409
2410 effective_desired = svc.desired_count;
2411 }
2412
2413 let mut spawn: Vec<String> = Vec::new();
2415 let mut stop: Vec<String> = Vec::new();
2416
2417 let service_tag = format!("ecs-svc/{}", service_name);
2419 let current_tasks: Vec<(String, String)> = state
2420 .tasks
2421 .iter()
2422 .filter(|(_, t)| {
2423 t.started_by.as_deref() == Some(service_tag.as_str())
2424 && t.cluster_name == cluster_name
2425 && t.last_status != "STOPPED"
2426 })
2427 .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2428 .collect();
2429
2430 let current_count = current_tasks.len() as i32;
2431 if effective_desired > current_count {
2432 let add = (effective_desired - current_count) as usize;
2433 let svc_snapshot = state.services.get(&key).unwrap().clone();
2434 let mut new_ids = spawn_service_tasks(
2435 state,
2436 &svc_snapshot,
2437 add as i32,
2438 &principal_arn,
2439 &launch_type_clone,
2440 );
2441 spawn.append(&mut new_ids);
2442 } else if effective_desired < current_count {
2443 let remove = (current_count - effective_desired) as usize;
2444 for (id, _) in current_tasks.iter().take(remove) {
2445 stop.push(id.clone());
2446 }
2447 }
2448
2449 if new_deployment_triggered {
2455 let new_td_arn_match = state
2456 .services
2457 .get(&key)
2458 .unwrap()
2459 .task_definition_arn
2460 .clone();
2461 let kept_on_new_td: i32 = current_tasks
2465 .iter()
2466 .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2467 .count() as i32;
2468 for (id, t_arn) in ¤t_tasks {
2469 if *t_arn != new_td_arn_match && !stop.contains(id) {
2470 stop.push(id.clone());
2471 }
2472 }
2473 let already_spawned = spawn.len() as i32;
2474 let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2475 if need > 0 {
2476 let svc_snapshot = state.services.get(&key).unwrap().clone();
2477 let mut more = spawn_service_tasks(
2478 state,
2479 &svc_snapshot,
2480 need,
2481 &principal_arn,
2482 &launch_type_clone,
2483 );
2484 spawn.append(&mut more);
2485 }
2486 }
2487
2488 state.push_event(crate::state::LifecycleEvent {
2489 at: Utc::now(),
2490 event_type: "ServiceUpdated".into(),
2491 task_arn: None,
2492 cluster_arn: Some(service_cluster_arn),
2493 last_status: Some("ACTIVE".into()),
2494 detail: json!({
2495 "serviceArn": state.services.get(&key).unwrap().service_arn,
2496 "desiredCount": effective_desired,
2497 "previousDesiredCount": old_desired,
2498 "newDeployment": new_deployment_triggered,
2499 "drainedDeployments": old_deployments_drained,
2500 }),
2501 });
2502
2503 let svc = state.services.get(&key).unwrap();
2504 (service_to_json(svc), spawn, stop)
2505 };
2506
2507 if let Some(rt) = runtime {
2508 for id in spawn_ids {
2509 rt.clone().run_task(self.state.clone(), id, account.clone());
2510 }
2511 for id in stop_ids {
2512 let rt2 = rt.clone();
2513 let id_clone = id.clone();
2514 tokio::spawn(async move {
2515 rt2.stop_task(&id_clone, "ECS service scale-down").await;
2516 });
2517 let mut accounts = self.state.write();
2519 if let Some(state) = accounts.get_mut(&account) {
2520 if let Some(task) = state.tasks.get_mut(&id) {
2521 task.desired_status = "STOPPED".into();
2522 task.stopping_at = Some(Utc::now());
2523 }
2524 }
2525 }
2526 }
2527
2528 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2529 }
2530
2531 async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2532 let body = request.json_body();
2533 let service_ref = req_str(&body, "service")?;
2534 let service_name = service_name_from_ref(service_ref);
2535 let cluster_ref = opt_str(&body, "cluster");
2536 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2537 let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2538
2539 let (snapshot, task_ids_to_stop) = {
2540 let mut accounts = self.state.write();
2541 let state = accounts
2542 .get_mut(&request.account_id)
2543 .ok_or_else(|| service_not_found(&service_name))?;
2544 let key = EcsState::service_key(&cluster_name, &service_name);
2545 let svc = state
2546 .services
2547 .get_mut(&key)
2548 .ok_or_else(|| service_not_found(&service_name))?;
2549 if !force && svc.desired_count > 0 {
2550 return Err(client_exception(
2551 "The service cannot be stopped while it is scaled above 0. \
2552 Either set desiredCount to 0 first, or pass force=true.",
2553 ));
2554 }
2555 svc.desired_count = 0;
2556 svc.status = "DRAINING".into();
2557 let service_tag = format!("ecs-svc/{}", service_name);
2558 let stop_ids: Vec<String> = state
2559 .tasks
2560 .iter()
2561 .filter(|(_, t)| {
2562 t.started_by.as_deref() == Some(service_tag.as_str())
2563 && t.cluster_name == cluster_name
2564 && t.last_status != "STOPPED"
2565 })
2566 .map(|(id, _)| id.clone())
2567 .collect();
2568 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2569 if cluster.active_services_count > 0 {
2570 cluster.active_services_count -= 1;
2571 }
2572 }
2573 let svc_snapshot = state.services.get(&key).unwrap().clone();
2574 state.services.remove(&key);
2575 state.push_event(crate::state::LifecycleEvent {
2576 at: Utc::now(),
2577 event_type: "ServiceDeleted".into(),
2578 task_arn: None,
2579 cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2580 last_status: Some("DRAINING".into()),
2581 detail: json!({"serviceArn": svc_snapshot.service_arn}),
2582 });
2583 (svc_snapshot, stop_ids)
2584 };
2585
2586 if let Some(rt) = &self.runtime {
2587 for id in &task_ids_to_stop {
2588 rt.stop_task(id, "ECS service deletion").await;
2589 let mut accounts = self.state.write();
2590 if let Some(state) = accounts.get_mut(&request.account_id) {
2591 if let Some(task) = state.tasks.get_mut(id) {
2592 task.desired_status = "STOPPED".into();
2593 task.stopping_at = Some(Utc::now());
2594 }
2595 }
2596 }
2597 }
2598
2599 Ok(AwsResponse::ok_json(
2600 json!({ "service": service_to_json(&snapshot) }),
2601 ))
2602 }
2603
2604 fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2605 let body = request.json_body();
2606 let cluster_ref = opt_str(&body, "cluster");
2607 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2608 let refs: Vec<String> = body
2609 .get("services")
2610 .and_then(|v| v.as_array())
2611 .map(|arr| {
2612 arr.iter()
2613 .filter_map(|v| v.as_str().map(String::from))
2614 .collect()
2615 })
2616 .unwrap_or_default();
2617
2618 let account = request.account_id.clone();
2619 let accounts = self.state.read();
2620 let mut found = Vec::new();
2621 let mut failures = Vec::new();
2622 let Some(state) = accounts.get(&account) else {
2623 for r in &refs {
2624 failures.push(json!({"arn": r, "reason": "MISSING"}));
2625 }
2626 return Ok(AwsResponse::ok_json(
2627 json!({"services": found, "failures": failures}),
2628 ));
2629 };
2630 for r in &refs {
2631 let name = service_name_from_ref(r);
2632 let key = EcsState::service_key(&cluster_name, &name);
2633 match state.services.get(&key) {
2634 Some(svc) => {
2635 let mut v = service_to_json(svc);
2636 recompute_service_counts(state, &name, &cluster_name, &mut v);
2638 found.push(v);
2639 }
2640 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2641 }
2642 }
2643 Ok(AwsResponse::ok_json(json!({
2644 "services": found,
2645 "failures": failures,
2646 })))
2647 }
2648
2649 fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2650 let body = request.json_body();
2651 let cluster_ref = opt_str(&body, "cluster");
2652 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2653 let launch_type = opt_str(&body, "launchType");
2654 let scheduling = opt_str(&body, "schedulingStrategy");
2655 let max_results = body
2656 .get("maxResults")
2657 .and_then(|v| v.as_i64())
2658 .filter(|n| (1..=100).contains(n))
2659 .map(|n| n as usize)
2660 .unwrap_or(100);
2661 let next_token = opt_str(&body, "nextToken").unwrap_or("");
2662
2663 let account = request.account_id.clone();
2664 let accounts = self.state.read();
2665 let mut arns: Vec<String> = match accounts.get(&account) {
2666 Some(state) => state
2667 .services
2668 .values()
2669 .filter(|s| s.cluster_name == cluster_name)
2670 .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2671 .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2672 .map(|s| s.service_arn.clone())
2673 .collect(),
2674 None => Vec::new(),
2675 };
2676 arns.sort();
2677 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2678 let end = (start + max_results).min(arns.len());
2679 let page = arns[start..end].to_vec();
2680 let mut out = json!({"serviceArns": page});
2681 if end < arns.len() {
2682 out.as_object_mut()
2683 .unwrap()
2684 .insert("nextToken".into(), json!(end.to_string()));
2685 }
2686 Ok(AwsResponse::ok_json(out))
2687 }
2688
2689 fn list_services_by_namespace(
2690 &self,
2691 request: &AwsRequest,
2692 ) -> Result<AwsResponse, AwsServiceError> {
2693 let body = request.json_body();
2699 let namespace = req_str(&body, "namespace")?.to_string();
2700 let account = request.account_id.clone();
2701 let accounts = self.state.read();
2702 let mut arns: Vec<String> = match accounts.get(&account) {
2703 Some(state) => state
2704 .services
2705 .values()
2706 .filter(|s| {
2707 s.service_registries.iter().any(|r| {
2708 r.get("registryArn")
2709 .and_then(|v| v.as_str())
2710 .is_some_and(|arn| arn.contains(&namespace))
2711 })
2712 })
2713 .map(|s| s.service_arn.clone())
2714 .collect(),
2715 None => Vec::new(),
2716 };
2717 arns.sort();
2718 Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2719 }
2720}
2721
2722fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2723 if name.is_empty() || name.len() > 255 {
2724 return Err(invalid_parameter("Service name must be 1-255 characters"));
2725 }
2726 let ok = name
2727 .chars()
2728 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2729 if !ok {
2730 return Err(invalid_parameter(
2731 "Service name may only contain letters, numbers, hyphens, and underscores",
2732 ));
2733 }
2734 Ok(())
2735}
2736
2737fn service_name_from_ref(input: &str) -> String {
2738 if let Some(rest) = input.rsplit('/').next() {
2739 return rest.to_string();
2740 }
2741 input.to_string()
2742}
2743
2744fn service_not_found(name: &str) -> AwsServiceError {
2745 AwsServiceError::aws_error(
2746 StatusCode::BAD_REQUEST,
2747 "ServiceNotFoundException",
2748 format!("The service could not be found: {name}"),
2749 )
2750}
2751
2752fn service_already_exists(name: &str) -> AwsServiceError {
2753 AwsServiceError::aws_error(
2754 StatusCode::BAD_REQUEST,
2755 "ServiceNotActiveException",
2756 format!("The service {name} already exists"),
2757 )
2758}
2759
2760fn spawn_service_tasks(
2765 state: &mut EcsState,
2766 service: &Service,
2767 count: i32,
2768 principal_arn: &str,
2769 launch_type: &str,
2770) -> Vec<String> {
2771 if count <= 0 {
2772 return Vec::new();
2773 }
2774 let Some(revisions) = state.task_definitions.get(&service.family) else {
2775 return Vec::new();
2776 };
2777 let Some(td) = revisions.get(&service.revision) else {
2778 return Vec::new();
2779 };
2780 let container_defs = td.container_definitions.clone();
2781 let cpu = td.cpu.clone();
2782 let memory = td.memory.clone();
2783 let task_role = td.task_role_arn.clone();
2784 let exec_role = td.execution_role_arn.clone();
2785 let cluster_name = service.cluster_name.clone();
2786 let cluster_arn = service.cluster_arn.clone();
2787 let td_arn = service.task_definition_arn.clone();
2788 let family = service.family.clone();
2789 let revision = service.revision;
2790 let service_tag = format!("ecs-svc/{}", service.service_name);
2791
2792 let mut ids = Vec::with_capacity(count as usize);
2793 for _ in 0..count {
2794 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2795 let task_arn = state.task_arn(&cluster_name, &task_id);
2796 let containers: Vec<Container> = container_defs
2797 .iter()
2798 .map(|def| Container {
2799 container_arn: format!(
2800 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2801 state.region,
2802 state.account_id,
2803 cluster_name,
2804 task_id,
2805 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2806 ),
2807 name: def
2808 .get("name")
2809 .and_then(|v| v.as_str())
2810 .unwrap_or("app")
2811 .to_string(),
2812 image: def
2813 .get("image")
2814 .and_then(|v| v.as_str())
2815 .unwrap_or("")
2816 .to_string(),
2817 task_arn: task_arn.clone(),
2818 last_status: "PENDING".into(),
2819 exit_code: None,
2820 reason: None,
2821 runtime_id: None,
2822 essential: def
2823 .get("essential")
2824 .and_then(|v| v.as_bool())
2825 .unwrap_or(true),
2826 cpu: def
2827 .get("cpu")
2828 .and_then(|v| v.as_i64())
2829 .map(|n| n.to_string()),
2830 memory: def
2831 .get("memory")
2832 .and_then(|v| v.as_i64())
2833 .map(|n| n.to_string()),
2834 memory_reservation: def
2835 .get("memoryReservation")
2836 .and_then(|v| v.as_i64())
2837 .map(|n| n.to_string()),
2838 network_bindings: Vec::new(),
2839 network_interfaces: Vec::new(),
2840 health_status: Some("UNKNOWN".into()),
2841 managed_agents: None,
2842 })
2843 .collect();
2844 let awslogs = container_defs.iter().find_map(|def| {
2845 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2846 let log_cfg = def.get("logConfiguration")?;
2847 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2848 return None;
2849 }
2850 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2851 Some(AwsLogsConfig {
2852 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2853 stream_prefix: opts
2854 .get("awslogs-stream-prefix")
2855 .and_then(|v| v.as_str())
2856 .map(String::from),
2857 region: opts
2858 .get("awslogs-region")
2859 .and_then(|v| v.as_str())
2860 .unwrap_or(&state.region)
2861 .to_string(),
2862 container_name: name,
2863 })
2864 });
2865 let task = Task {
2866 task_arn: task_arn.clone(),
2867 task_id: task_id.clone(),
2868 cluster_arn: cluster_arn.clone(),
2869 cluster_name: cluster_name.clone(),
2870 task_definition_arn: td_arn.clone(),
2871 family: family.clone(),
2872 revision,
2873 last_status: "PENDING".into(),
2874 desired_status: "RUNNING".into(),
2875 launch_type: launch_type.into(),
2876 platform_version: Some("1.4.0".into()),
2877 cpu: cpu.clone(),
2878 memory: memory.clone(),
2879 containers,
2880 overrides: json!({}),
2881 started_by: Some(service_tag.clone()),
2882 group: Some(format!("service:{}", service.service_name)),
2883 connectivity: "CONNECTING".into(),
2884 stop_code: None,
2885 stopped_reason: None,
2886 created_at: Utc::now(),
2887 started_at: None,
2888 stopping_at: None,
2889 stopped_at: None,
2890 pull_started_at: None,
2891 pull_stopped_at: None,
2892 connectivity_at: None,
2893 started_by_ref_id: None,
2894 execution_role_arn: exec_role.clone(),
2895 task_role_arn: task_role.clone(),
2896 tags: Vec::new(),
2897 awslogs,
2898 captured_logs: String::new(),
2899 protection: None,
2900 };
2901 state.tasks.insert(task_id.clone(), task);
2902 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2903 cluster.pending_tasks_count += 1;
2904 }
2905 ids.push(task_id);
2906 }
2907 let _ = principal_arn;
2908 ids
2909}
2910
2911fn recompute_service_counts(
2912 state: &EcsState,
2913 service_name: &str,
2914 cluster_name: &str,
2915 service_json: &mut Value,
2916) {
2917 let service_tag = format!("ecs-svc/{}", service_name);
2918 let mut running = 0i32;
2919 let mut pending = 0i32;
2920 for t in state.tasks.values() {
2921 if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2922 match t.last_status.as_str() {
2923 "RUNNING" => running += 1,
2924 "PENDING" | "PROVISIONING" => pending += 1,
2925 _ => {}
2926 }
2927 }
2928 }
2929 if let Some(map) = service_json.as_object_mut() {
2930 map.insert("runningCount".into(), json!(running));
2931 map.insert("pendingCount".into(), json!(pending));
2932 }
2933}
2934
2935fn service_to_json(svc: &Service) -> Value {
2936 let mut map = serde_json::Map::new();
2937 map.insert("serviceArn".into(), json!(svc.service_arn));
2938 map.insert("serviceName".into(), json!(svc.service_name));
2939 map.insert("clusterArn".into(), json!(svc.cluster_arn));
2940 map.insert("status".into(), json!(svc.status));
2941 map.insert("desiredCount".into(), json!(svc.desired_count));
2942 map.insert("runningCount".into(), json!(svc.running_count));
2943 map.insert("pendingCount".into(), json!(svc.pending_count));
2944 map.insert("launchType".into(), json!(svc.launch_type));
2945 map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2946 map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2947 map.insert(
2948 "deploymentController".into(),
2949 json!({"type": svc.deployment_controller}),
2950 );
2951 let mut deployment_cfg = serde_json::Map::new();
2952 if let Some(n) = svc.minimum_healthy_percent {
2953 deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2954 }
2955 if let Some(n) = svc.maximum_percent {
2956 deployment_cfg.insert("maximumPercent".into(), json!(n));
2957 }
2958 if let Some(ref cb) = svc.circuit_breaker {
2959 deployment_cfg.insert(
2960 "deploymentCircuitBreaker".into(),
2961 json!({"enable": cb.enable, "rollback": cb.rollback}),
2962 );
2963 }
2964 if !deployment_cfg.is_empty() {
2965 map.insert(
2966 "deploymentConfiguration".into(),
2967 Value::Object(deployment_cfg),
2968 );
2969 }
2970 map.insert(
2971 "deployments".into(),
2972 Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2973 );
2974 map.insert(
2975 "loadBalancers".into(),
2976 Value::Array(svc.load_balancers.clone()),
2977 );
2978 map.insert(
2979 "serviceRegistries".into(),
2980 Value::Array(svc.service_registries.clone()),
2981 );
2982 map.insert(
2983 "placementConstraints".into(),
2984 Value::Array(svc.placement_constraints.clone()),
2985 );
2986 map.insert(
2987 "placementStrategy".into(),
2988 Value::Array(svc.placement_strategy.clone()),
2989 );
2990 if let Some(ref v) = svc.network_configuration {
2991 map.insert("networkConfiguration".into(), v.clone());
2992 }
2993 if let Some(ref v) = svc.role_arn {
2994 map.insert("roleArn".into(), json!(v));
2995 }
2996 if let Some(ref v) = svc.created_by {
2997 map.insert("createdBy".into(), json!(v));
2998 }
2999 map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
3000 Value::Object(map)
3001}
3002
3003fn deployment_to_json(d: &Deployment) -> Value {
3004 json!({
3005 "id": d.deployment_id,
3006 "status": d.status,
3007 "taskDefinition": d.task_definition_arn,
3008 "desiredCount": d.desired_count,
3009 "pendingCount": d.pending_count,
3010 "runningCount": d.running_count,
3011 "failedTasks": d.failed_tasks,
3012 "createdAt": d.created_at.timestamp(),
3013 "updatedAt": d.updated_at.timestamp(),
3014 "launchType": d.launch_type,
3015 "rolloutState": d.rollout_state,
3016 "rolloutStateReason": d.rollout_state_reason,
3017 })
3018}
3019
3020impl EcsService {
3023 fn register_container_instance(
3024 &self,
3025 request: &AwsRequest,
3026 ) -> Result<AwsResponse, AwsServiceError> {
3027 let body = request.json_body();
3028 let cluster_ref = opt_str(&body, "cluster");
3029 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3030 let ec2_id = opt_str(&body, "instanceIdentityDocument")
3031 .and_then(|s| serde_json::from_str::<Value>(s).ok())
3032 .and_then(|v| {
3033 v.get("instanceId")
3034 .and_then(|x| x.as_str())
3035 .map(String::from)
3036 });
3037 let tags = parse_tags(&body);
3038
3039 let account = request.account_id.clone();
3040 let mut accounts = self.state.write();
3041 let state = accounts.get_or_create(&account);
3042 let cluster_arn = state
3043 .clusters
3044 .get(&cluster_name)
3045 .map(|c| c.cluster_arn.clone())
3046 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
3047 let uuid = uuid::Uuid::new_v4().to_string();
3048 let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
3049 let key = format!("{}/{}", cluster_name, uuid);
3050 let ci = ContainerInstance {
3051 container_instance_arn: ci_arn.clone(),
3052 ec2_instance_id: ec2_id,
3053 cluster_name: cluster_name.clone(),
3054 cluster_arn,
3055 status: "ACTIVE".into(),
3056 version: 1,
3057 version_info: body.get("versionInfo").cloned(),
3058 agent_connected: true,
3059 agent_update_status: None,
3060 remaining_resources: body
3061 .get("totalResources")
3062 .and_then(|v| v.as_array())
3063 .cloned()
3064 .unwrap_or_default(),
3065 registered_resources: body
3066 .get("totalResources")
3067 .and_then(|v| v.as_array())
3068 .cloned()
3069 .unwrap_or_default(),
3070 running_tasks_count: 0,
3071 pending_tasks_count: 0,
3072 registered_at: Utc::now(),
3073 attributes: body
3074 .get("attributes")
3075 .and_then(|v| v.as_array())
3076 .map(|arr| {
3077 arr.iter()
3078 .filter_map(|a| {
3079 let name = a.get("name").and_then(|v| v.as_str())?;
3080 Some(AttributeRef {
3081 name: name.to_string(),
3082 value: a.get("value").and_then(|v| v.as_str()).map(String::from),
3083 target_type: a
3084 .get("targetType")
3085 .and_then(|v| v.as_str())
3086 .map(String::from),
3087 target_id: a
3088 .get("targetId")
3089 .and_then(|v| v.as_str())
3090 .map(String::from),
3091 })
3092 })
3093 .collect()
3094 })
3095 .unwrap_or_default(),
3096 tags,
3097 capacity_provider_name: None,
3098 health_status: None,
3099 };
3100 state.container_instances.insert(key, ci.clone());
3101 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3102 cluster.registered_container_instances_count += 1;
3103 }
3104 Ok(AwsResponse::ok_json(json!({
3105 "containerInstance": container_instance_to_json(&ci),
3106 })))
3107 }
3108
3109 fn deregister_container_instance(
3110 &self,
3111 request: &AwsRequest,
3112 ) -> Result<AwsResponse, AwsServiceError> {
3113 let body = request.json_body();
3114 let ci_ref = req_str(&body, "containerInstance")?.to_string();
3115 let cluster_ref = opt_str(&body, "cluster");
3116 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3117 let id = container_instance_id_from_ref(&ci_ref);
3118 let key = format!("{}/{}", cluster_name, id);
3119
3120 let account = request.account_id.clone();
3121 let mut accounts = self.state.write();
3122 let state = accounts
3123 .get_mut(&account)
3124 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3125 let mut ci = state
3126 .container_instances
3127 .remove(&key)
3128 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3129 ci.status = "INACTIVE".into();
3130 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
3131 if cluster.registered_container_instances_count > 0 {
3132 cluster.registered_container_instances_count -= 1;
3133 }
3134 }
3135 Ok(AwsResponse::ok_json(json!({
3136 "containerInstance": container_instance_to_json(&ci),
3137 })))
3138 }
3139
3140 fn describe_container_instances(
3141 &self,
3142 request: &AwsRequest,
3143 ) -> Result<AwsResponse, AwsServiceError> {
3144 let body = request.json_body();
3145 let cluster_ref = opt_str(&body, "cluster");
3146 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3147 let refs: Vec<String> = body
3148 .get("containerInstances")
3149 .and_then(|v| v.as_array())
3150 .map(|arr| {
3151 arr.iter()
3152 .filter_map(|v| v.as_str().map(String::from))
3153 .collect()
3154 })
3155 .unwrap_or_default();
3156
3157 let accounts = self.state.read();
3158 let mut found = Vec::new();
3159 let mut failures = Vec::new();
3160 if let Some(state) = accounts.get(&request.account_id) {
3161 for r in &refs {
3162 let id = container_instance_id_from_ref(r);
3163 let key = format!("{}/{}", cluster_name, id);
3164 match state.container_instances.get(&key) {
3165 Some(ci) => found.push(container_instance_to_json(ci)),
3166 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3167 }
3168 }
3169 } else {
3170 for r in &refs {
3171 failures.push(json!({"arn": r, "reason": "MISSING"}));
3172 }
3173 }
3174 Ok(AwsResponse::ok_json(json!({
3175 "containerInstances": found,
3176 "failures": failures,
3177 })))
3178 }
3179
3180 fn list_container_instances(
3181 &self,
3182 request: &AwsRequest,
3183 ) -> Result<AwsResponse, AwsServiceError> {
3184 let body = request.json_body();
3185 let cluster_ref = opt_str(&body, "cluster");
3186 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3187 let status_filter = opt_str(&body, "status");
3188 let max_results = body
3189 .get("maxResults")
3190 .and_then(|v| v.as_i64())
3191 .filter(|n| (1..=100).contains(n))
3192 .map(|n| n as usize)
3193 .unwrap_or(100);
3194 let next_token = opt_str(&body, "nextToken").unwrap_or("");
3195
3196 let accounts = self.state.read();
3197 let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3198 Some(state) => state
3199 .container_instances
3200 .values()
3201 .filter(|ci| ci.cluster_name == cluster_name)
3202 .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3203 .map(|ci| ci.container_instance_arn.clone())
3204 .collect(),
3205 None => Vec::new(),
3206 };
3207 arns.sort();
3208 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3209 let end = (start + max_results).min(arns.len());
3210 let page = arns[start..end].to_vec();
3211 let mut out = json!({"containerInstanceArns": page});
3212 if end < arns.len() {
3213 out.as_object_mut()
3214 .unwrap()
3215 .insert("nextToken".into(), json!(end.to_string()));
3216 }
3217 Ok(AwsResponse::ok_json(out))
3218 }
3219
3220 fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3221 let body = request.json_body();
3222 let ci_ref = req_str(&body, "containerInstance")?.to_string();
3223 let cluster_ref = opt_str(&body, "cluster");
3224 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3225 let id = container_instance_id_from_ref(&ci_ref);
3226 let key = format!("{}/{}", cluster_name, id);
3227 let mut accounts = self.state.write();
3228 let state = accounts
3229 .get_mut(&request.account_id)
3230 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3231 let ci = state
3232 .container_instances
3233 .get_mut(&key)
3234 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3235 ci.agent_update_status = Some("UPDATED".into());
3236 Ok(AwsResponse::ok_json(json!({
3237 "containerInstance": container_instance_to_json(ci),
3238 })))
3239 }
3240
3241 fn update_container_instances_state(
3242 &self,
3243 request: &AwsRequest,
3244 ) -> Result<AwsResponse, AwsServiceError> {
3245 let body = request.json_body();
3246 let status = req_str(&body, "status")?.to_string();
3247 let cluster_ref = opt_str(&body, "cluster");
3248 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3249 let refs: Vec<String> = body
3250 .get("containerInstances")
3251 .and_then(|v| v.as_array())
3252 .map(|arr| {
3253 arr.iter()
3254 .filter_map(|v| v.as_str().map(String::from))
3255 .collect()
3256 })
3257 .unwrap_or_default();
3258 let mut accounts = self.state.write();
3259 let state = accounts
3260 .get_mut(&request.account_id)
3261 .ok_or_else(|| client_exception("account not found"))?;
3262 let mut found = Vec::new();
3263 let mut failures = Vec::new();
3264 for r in &refs {
3265 let id = container_instance_id_from_ref(r);
3266 let key = format!("{}/{}", cluster_name, id);
3267 match state.container_instances.get_mut(&key) {
3268 Some(ci) => {
3269 ci.status = status.clone();
3270 found.push(container_instance_to_json(ci));
3271 }
3272 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3273 }
3274 }
3275 Ok(AwsResponse::ok_json(json!({
3276 "containerInstances": found,
3277 "failures": failures,
3278 })))
3279 }
3280
3281 fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3282 let body = request.json_body();
3283 let cluster_ref = opt_str(&body, "cluster");
3284 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3285 let attrs = body
3286 .get("attributes")
3287 .and_then(|v| v.as_array())
3288 .cloned()
3289 .unwrap_or_default();
3290
3291 let mut accounts = self.state.write();
3292 let state = accounts.get_or_create(&request.account_id);
3293 let mut stored = Vec::new();
3294 for a in &attrs {
3295 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3296 continue;
3297 };
3298 let target_type = a
3299 .get("targetType")
3300 .and_then(|v| v.as_str())
3301 .unwrap_or("container-instance")
3302 .to_string();
3303 let target_id = a
3304 .get("targetId")
3305 .and_then(|v| v.as_str())
3306 .unwrap_or("")
3307 .to_string();
3308 let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3309 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3310 let attr = Attribute {
3311 cluster_name: cluster_name.clone(),
3312 target_type: target_type.clone(),
3313 target_id: target_id.clone(),
3314 name: name.to_string(),
3315 value: value.clone(),
3316 };
3317 state.attributes.insert(key, attr);
3318 stored.push(json!({
3319 "name": name,
3320 "value": value,
3321 "targetType": target_type,
3322 "targetId": target_id,
3323 }));
3324 }
3325 Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3326 }
3327
3328 fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3329 let body = request.json_body();
3330 let cluster_ref = opt_str(&body, "cluster");
3331 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3332 let attrs = body
3333 .get("attributes")
3334 .and_then(|v| v.as_array())
3335 .cloned()
3336 .unwrap_or_default();
3337 let mut accounts = self.state.write();
3338 let state = accounts.get_or_create(&request.account_id);
3339 let mut deleted = Vec::new();
3340 for a in &attrs {
3341 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3342 continue;
3343 };
3344 let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3345 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3346 if let Some(attr) = state.attributes.remove(&key) {
3347 deleted.push(json!({
3348 "name": attr.name,
3349 "value": attr.value,
3350 "targetType": attr.target_type,
3351 "targetId": attr.target_id,
3352 }));
3353 }
3354 }
3355 Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3356 }
3357
3358 fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3359 let body = request.json_body();
3360 let cluster_ref = opt_str(&body, "cluster");
3361 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3362 let target_type = req_str(&body, "targetType")?.to_string();
3363 let attr_name = opt_str(&body, "attributeName");
3364 let attr_value = opt_str(&body, "attributeValue");
3365
3366 let accounts = self.state.read();
3367 let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3368 Some(state) => state
3369 .attributes
3370 .values()
3371 .filter(|a| a.cluster_name == cluster_name)
3372 .filter(|a| a.target_type == target_type)
3373 .filter(|a| attr_name.is_none_or(|n| a.name == n))
3374 .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3375 .map(|a| {
3376 json!({
3377 "name": a.name,
3378 "value": a.value,
3379 "targetType": a.target_type,
3380 "targetId": a.target_id,
3381 })
3382 })
3383 .collect(),
3384 None => Vec::new(),
3385 };
3386 Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3387 }
3388
3389 fn create_capacity_provider(
3390 &self,
3391 request: &AwsRequest,
3392 ) -> Result<AwsResponse, AwsServiceError> {
3393 let body = request.json_body();
3394 let name = req_str(&body, "name")?.to_string();
3395 if name.starts_with("aws") || name.starts_with("ecs") {
3396 return Err(invalid_parameter(format!(
3397 "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3398 )));
3399 }
3400 let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3401 let tags = parse_tags(&body);
3402
3403 let mut accounts = self.state.write();
3404 let state = accounts.get_or_create(&request.account_id);
3405 if state.capacity_providers.contains_key(&name) {
3406 return Err(client_exception(format!(
3407 "Capacity provider already exists: {name}"
3408 )));
3409 }
3410 let arn = format!(
3411 "arn:aws:ecs:{}:{}:capacity-provider/{}",
3412 state.region, state.account_id, name
3413 );
3414 let cp = CapacityProvider {
3415 name: name.clone(),
3416 arn,
3417 status: "ACTIVE".into(),
3418 auto_scaling_group_provider,
3419 update_status: None,
3420 update_status_reason: None,
3421 created_at: Utc::now(),
3422 tags,
3423 };
3424 state.capacity_providers.insert(name.clone(), cp.clone());
3425 Ok(AwsResponse::ok_json(json!({
3426 "capacityProvider": capacity_provider_to_json(&cp),
3427 })))
3428 }
3429
3430 fn delete_capacity_provider(
3431 &self,
3432 request: &AwsRequest,
3433 ) -> Result<AwsResponse, AwsServiceError> {
3434 let body = request.json_body();
3435 let input = req_str(&body, "capacityProvider")?.to_string();
3436 let name = capacity_provider_name_from_ref(&input);
3437 let mut accounts = self.state.write();
3438 let state = accounts
3439 .get_mut(&request.account_id)
3440 .ok_or_else(|| capacity_provider_not_found(&name))?;
3441 let mut cp = state
3442 .capacity_providers
3443 .remove(&name)
3444 .ok_or_else(|| capacity_provider_not_found(&name))?;
3445 cp.status = "INACTIVE".into();
3446 Ok(AwsResponse::ok_json(json!({
3447 "capacityProvider": capacity_provider_to_json(&cp),
3448 })))
3449 }
3450
3451 fn describe_capacity_providers(
3452 &self,
3453 request: &AwsRequest,
3454 ) -> Result<AwsResponse, AwsServiceError> {
3455 let body = request.json_body();
3456 let names: Vec<String> = body
3457 .get("capacityProviders")
3458 .and_then(|v| v.as_array())
3459 .map(|arr| {
3460 arr.iter()
3461 .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3462 .collect()
3463 })
3464 .unwrap_or_default();
3465 let accounts = self.state.read();
3466 let mut found = Vec::new();
3467 let mut failures = Vec::new();
3468 if let Some(state) = accounts.get(&request.account_id) {
3469 if names.is_empty() {
3470 for cp in state.capacity_providers.values() {
3471 found.push(capacity_provider_to_json(cp));
3472 }
3473 } else {
3474 for n in &names {
3475 match state.capacity_providers.get(n) {
3476 Some(cp) => found.push(capacity_provider_to_json(cp)),
3477 None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3478 }
3479 }
3480 }
3481 }
3482 Ok(AwsResponse::ok_json(json!({
3483 "capacityProviders": found,
3484 "failures": failures,
3485 })))
3486 }
3487
3488 fn update_capacity_provider(
3489 &self,
3490 request: &AwsRequest,
3491 ) -> Result<AwsResponse, AwsServiceError> {
3492 let body = request.json_body();
3493 let input = req_str(&body, "name")?.to_string();
3494 let name = capacity_provider_name_from_ref(&input);
3495 let asg = body.get("autoScalingGroupProvider").cloned();
3496 let mut accounts = self.state.write();
3497 let state = accounts
3498 .get_mut(&request.account_id)
3499 .ok_or_else(|| capacity_provider_not_found(&name))?;
3500 let cp = state
3501 .capacity_providers
3502 .get_mut(&name)
3503 .ok_or_else(|| capacity_provider_not_found(&name))?;
3504 if let Some(v) = asg {
3505 cp.auto_scaling_group_provider = Some(v);
3506 }
3507 cp.update_status = Some("UPDATE_COMPLETE".into());
3508 Ok(AwsResponse::ok_json(json!({
3509 "capacityProvider": capacity_provider_to_json(cp),
3510 })))
3511 }
3512
3513 fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3514 let body = request.json_body();
3515 let refs: Vec<String> = body
3516 .get("tasks")
3517 .and_then(|v| v.as_array())
3518 .map(|arr| {
3519 arr.iter()
3520 .filter_map(|v| v.as_str().map(String::from))
3521 .collect()
3522 })
3523 .unwrap_or_default();
3524 let accounts = self.state.read();
3525 let mut protections = Vec::new();
3526 let mut failures = Vec::new();
3527 if let Some(state) = accounts.get(&request.account_id) {
3528 for r in &refs {
3529 let id = task_id_from_ref(r);
3530 match state.tasks.get(&id) {
3531 Some(t) => protections.push(task_protection_json(t)),
3532 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3533 }
3534 }
3535 }
3536 Ok(AwsResponse::ok_json(json!({
3537 "protectedTasks": protections,
3538 "failures": failures,
3539 })))
3540 }
3541
3542 fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3543 let body = request.json_body();
3544 let refs: Vec<String> = body
3545 .get("tasks")
3546 .and_then(|v| v.as_array())
3547 .map(|arr| {
3548 arr.iter()
3549 .filter_map(|v| v.as_str().map(String::from))
3550 .collect()
3551 })
3552 .unwrap_or_default();
3553 let protect = body
3554 .get("protectionEnabled")
3555 .and_then(|v| v.as_bool())
3556 .unwrap_or(false);
3557 let expires_in_minutes = body
3558 .get("expiresInMinutes")
3559 .and_then(|v| v.as_i64())
3560 .unwrap_or(2880);
3561 let expiration = if protect {
3562 Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3563 } else {
3564 None
3565 };
3566
3567 let mut accounts = self.state.write();
3568 let state = accounts
3569 .get_mut(&request.account_id)
3570 .ok_or_else(|| client_exception("account not found"))?;
3571 let mut protections = Vec::new();
3572 let mut failures = Vec::new();
3573 for r in &refs {
3574 let id = task_id_from_ref(r);
3575 match state.tasks.get_mut(&id) {
3576 Some(t) => {
3577 t.protection = Some(crate::state::TaskProtection {
3578 enabled: protect,
3579 expiration,
3580 });
3581 protections.push(task_protection_json(t));
3582 }
3583 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3584 }
3585 }
3586 Ok(AwsResponse::ok_json(json!({
3587 "protectedTasks": protections,
3588 "failures": failures,
3589 })))
3590 }
3591
3592 fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3593 let body = request.json_body();
3594 let service_ref = req_str(&body, "service")?;
3595 let service_name = service_name_from_ref(service_ref);
3596 let cluster_ref = opt_str(&body, "cluster");
3597 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3598 let task_definition = req_str(&body, "taskDefinition")?.to_string();
3599 let external_id = opt_str(&body, "externalId").map(String::from);
3600 let launch_type = opt_str(&body, "launchType").map(String::from);
3601 let platform_version = opt_str(&body, "platformVersion").map(String::from);
3602 let scale = body.get("scale").cloned();
3603 let tags = parse_tags(&body);
3604 let load_balancers = body
3605 .get("loadBalancers")
3606 .and_then(|v| v.as_array())
3607 .cloned()
3608 .unwrap_or_default();
3609 let service_registries = body
3610 .get("serviceRegistries")
3611 .and_then(|v| v.as_array())
3612 .cloned()
3613 .unwrap_or_default();
3614 let capacity_provider_strategy = body
3615 .get("capacityProviderStrategy")
3616 .and_then(|v| v.as_array())
3617 .cloned()
3618 .unwrap_or_default();
3619
3620 let mut accounts = self.state.write();
3621 let state = accounts
3622 .get_mut(&request.account_id)
3623 .ok_or_else(|| service_not_found(&service_name))?;
3624 let service_key = EcsState::service_key(&cluster_name, &service_name);
3625 let svc = state
3626 .services
3627 .get(&service_key)
3628 .ok_or_else(|| service_not_found(&service_name))?;
3629 if svc.deployment_controller != "EXTERNAL" {
3630 return Err(client_exception(
3631 "CreateTaskSet requires the service to be created with \
3632 deploymentController.type = EXTERNAL",
3633 ));
3634 }
3635 let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3636 let task_set = TaskSet {
3637 task_set_id: ts_id.clone(),
3638 task_set_arn: format!(
3639 "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3640 state.region, state.account_id, cluster_name, service_name, ts_id
3641 ),
3642 service_arn: svc.service_arn.clone(),
3643 cluster_arn: svc.cluster_arn.clone(),
3644 service_name: service_name.clone(),
3645 cluster_name: cluster_name.clone(),
3646 external_id,
3647 status: "ACTIVE".into(),
3648 task_definition,
3649 computed_desired_count: 0,
3650 pending_count: 0,
3651 running_count: 0,
3652 launch_type,
3653 platform_version,
3654 scale,
3655 stability_status: "STABILIZING".into(),
3656 created_at: Utc::now(),
3657 updated_at: Utc::now(),
3658 load_balancers,
3659 service_registries,
3660 capacity_provider_strategy,
3661 tags,
3662 };
3663 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3664 state.task_sets.insert(key, task_set.clone());
3665 Ok(AwsResponse::ok_json(json!({
3666 "taskSet": task_set_to_json(&task_set),
3667 })))
3668 }
3669
3670 fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3671 let body = request.json_body();
3672 let ts_ref = req_str(&body, "taskSet")?.to_string();
3673 let service_ref = req_str(&body, "service")?;
3674 let service_name = service_name_from_ref(service_ref);
3675 let cluster_ref = opt_str(&body, "cluster");
3676 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3677 let scale = body.get("scale").cloned();
3678
3679 let mut accounts = self.state.write();
3680 let state = accounts
3681 .get_mut(&request.account_id)
3682 .ok_or_else(|| client_exception("task set not found"))?;
3683 let ts_id = task_set_id_from_ref(&ts_ref);
3684 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3685 let ts = state
3686 .task_sets
3687 .get_mut(&key)
3688 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3689 if let Some(v) = scale {
3690 ts.scale = Some(v);
3691 }
3692 ts.updated_at = Utc::now();
3693 Ok(AwsResponse::ok_json(json!({
3694 "taskSet": task_set_to_json(ts),
3695 })))
3696 }
3697
3698 fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3699 let body = request.json_body();
3700 let ts_ref = req_str(&body, "taskSet")?.to_string();
3701 let service_ref = req_str(&body, "service")?;
3702 let service_name = service_name_from_ref(service_ref);
3703 let cluster_ref = opt_str(&body, "cluster");
3704 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3705 let ts_id = task_set_id_from_ref(&ts_ref);
3706 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3707
3708 let mut accounts = self.state.write();
3709 let state = accounts
3710 .get_mut(&request.account_id)
3711 .ok_or_else(|| client_exception("task set not found"))?;
3712 let mut ts = state
3713 .task_sets
3714 .remove(&key)
3715 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3716 ts.status = "DRAINING".into();
3717 Ok(AwsResponse::ok_json(json!({
3718 "taskSet": task_set_to_json(&ts),
3719 })))
3720 }
3721
3722 fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3723 let body = request.json_body();
3724 let service_ref = req_str(&body, "service")?;
3725 let service_name = service_name_from_ref(service_ref);
3726 let cluster_ref = opt_str(&body, "cluster");
3727 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3728 let filter_refs: Vec<String> = body
3729 .get("taskSets")
3730 .and_then(|v| v.as_array())
3731 .map(|arr| {
3732 arr.iter()
3733 .filter_map(|v| v.as_str().map(String::from))
3734 .collect()
3735 })
3736 .unwrap_or_default();
3737
3738 let accounts = self.state.read();
3739 let mut found = Vec::new();
3740 let mut failures = Vec::new();
3741 if let Some(state) = accounts.get(&request.account_id) {
3742 if filter_refs.is_empty() {
3743 for ts in state.task_sets.values() {
3744 if ts.cluster_name == cluster_name && ts.service_name == service_name {
3745 found.push(task_set_to_json(ts));
3746 }
3747 }
3748 } else {
3749 for r in &filter_refs {
3750 let id = task_set_id_from_ref(r);
3751 let key = format!("{}/{}/{}", cluster_name, service_name, id);
3752 match state.task_sets.get(&key) {
3753 Some(ts) => found.push(task_set_to_json(ts)),
3754 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3755 }
3756 }
3757 }
3758 }
3759 Ok(AwsResponse::ok_json(json!({
3760 "taskSets": found,
3761 "failures": failures,
3762 })))
3763 }
3764
3765 fn update_service_primary_task_set(
3766 &self,
3767 request: &AwsRequest,
3768 ) -> Result<AwsResponse, AwsServiceError> {
3769 let body = request.json_body();
3770 let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3771 let service_ref = req_str(&body, "service")?;
3772 let service_name = service_name_from_ref(service_ref);
3773 let cluster_ref = opt_str(&body, "cluster");
3774 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3775 let ts_id = task_set_id_from_ref(&ts_ref);
3776 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3777 let mut accounts = self.state.write();
3778 let state = accounts
3779 .get_mut(&request.account_id)
3780 .ok_or_else(|| client_exception("task set not found"))?;
3781 if !state.task_sets.contains_key(&key) {
3782 return Err(client_exception(format!("task set not found: {}", ts_ref)));
3783 }
3784 for ts in state.task_sets.values_mut() {
3788 if ts.service_name == service_name
3789 && ts.cluster_name == cluster_name
3790 && ts.status == "PRIMARY"
3791 && ts.task_set_id != ts_id
3792 {
3793 ts.status = "ACTIVE".into();
3794 ts.updated_at = Utc::now();
3795 }
3796 }
3797 let ts = state.task_sets.get_mut(&key).unwrap();
3798 ts.status = "PRIMARY".into();
3799 ts.updated_at = Utc::now();
3800 Ok(AwsResponse::ok_json(json!({
3801 "taskSet": task_set_to_json(ts),
3802 })))
3803 }
3804
3805 async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3806 let body = request.json_body();
3807 let task_ref = req_str(&body, "task")?.to_string();
3808 let command = req_str(&body, "command")?.to_string();
3809 let interactive = body
3810 .get("interactive")
3811 .and_then(|v| v.as_bool())
3812 .unwrap_or(false);
3813
3814 let container_id = {
3816 let accounts = self.state.read();
3817 let state = accounts
3818 .get(&request.account_id)
3819 .ok_or_else(|| task_not_found(&task_ref))?;
3820 let id = task_id_from_ref(&task_ref);
3821 state
3822 .tasks
3823 .get(&id)
3824 .and_then(|t| t.containers.first())
3825 .and_then(|c| c.runtime_id.clone())
3826 };
3827
3828 let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3829 if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3830 let out = tokio::process::Command::new("docker")
3835 .args(["exec", &id, "sh", "-c", &command])
3836 .output()
3837 .await
3838 .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3839 tracing::info!(
3840 task = %task_ref,
3841 exit = out.status.code().unwrap_or(-1),
3842 "ExecuteCommand via docker exec"
3843 );
3844 }
3845
3846 Ok(AwsResponse::ok_json(json!({
3847 "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3848 "containerArn": container_id.unwrap_or_default(),
3849 "containerName": opt_str(&body, "container").unwrap_or(""),
3850 "interactive": interactive,
3851 "session": {
3852 "sessionId": session_id,
3853 "streamUrl": "",
3854 "tokenValue": "",
3855 },
3856 "taskArn": task_ref,
3857 })))
3858 }
3859
3860 fn submit_container_state_change(
3861 &self,
3862 request: &AwsRequest,
3863 ) -> Result<AwsResponse, AwsServiceError> {
3864 let body = request.json_body();
3869 let task_ref = opt_str(&body, "task").unwrap_or("");
3870 let container_name = opt_str(&body, "containerName").unwrap_or("");
3871 let status = opt_str(&body, "status").map(String::from);
3872 let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3873 let reason = opt_str(&body, "reason").map(String::from);
3874
3875 if !task_ref.is_empty() {
3876 let mut accounts = self.state.write();
3877 if let Some(state) = accounts.get_mut(&request.account_id) {
3878 let id = task_id_from_ref(task_ref);
3879 if let Some(task) = state.tasks.get_mut(&id) {
3880 if let Some(container) = task
3881 .containers
3882 .iter_mut()
3883 .find(|c| c.name == container_name)
3884 {
3885 if let Some(s) = status {
3886 container.last_status = s;
3887 }
3888 if let Some(code) = exit_code {
3889 container.exit_code = Some(code);
3890 }
3891 if let Some(r) = reason {
3892 container.reason = Some(r);
3893 }
3894 }
3895 }
3896 }
3897 }
3898 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3899 }
3900
3901 fn submit_task_state_change(
3902 &self,
3903 request: &AwsRequest,
3904 ) -> Result<AwsResponse, AwsServiceError> {
3905 let body = request.json_body();
3906 let task_ref = opt_str(&body, "task").unwrap_or("");
3907 let status = opt_str(&body, "status").map(String::from);
3908 if !task_ref.is_empty() {
3909 let mut accounts = self.state.write();
3910 if let Some(state) = accounts.get_mut(&request.account_id) {
3911 let id = task_id_from_ref(task_ref);
3912 if let Some(task) = state.tasks.get_mut(&id) {
3913 if let Some(s) = status {
3914 task.last_status = s;
3915 }
3916 }
3917 }
3918 }
3919 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3920 }
3921
3922 fn submit_attachment_state_changes(
3923 &self,
3924 _request: &AwsRequest,
3925 ) -> Result<AwsResponse, AwsServiceError> {
3926 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3929 }
3930
3931 fn discover_poll_endpoint(
3932 &self,
3933 _request: &AwsRequest,
3934 ) -> Result<AwsResponse, AwsServiceError> {
3935 let accounts = self.state.read();
3939 let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3940 Ok(AwsResponse::ok_json(json!({
3941 "endpoint": endpoint,
3942 "telemetryEndpoint": endpoint,
3943 "serviceConnectEndpoint": endpoint,
3944 })))
3945 }
3946
3947 fn stop_service_deployment(
3948 &self,
3949 request: &AwsRequest,
3950 ) -> Result<AwsResponse, AwsServiceError> {
3951 let body = request.json_body();
3952 let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3953 let mut accounts = self.state.write();
3954 let state = accounts
3955 .get_mut(&request.account_id)
3956 .ok_or_else(|| client_exception("service deployment not found"))?;
3957 for svc in state.services.values_mut() {
3958 for d in svc.deployments.iter_mut() {
3959 if deployment_ref.contains(&d.deployment_id) {
3960 d.status = "STOPPED".into();
3961 d.rollout_state = "FAILED".into();
3962 d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3963 d.updated_at = Utc::now();
3964 return Ok(AwsResponse::ok_json(json!({
3965 "serviceDeployment": deployment_to_json(d),
3966 })));
3967 }
3968 }
3969 }
3970 Err(client_exception(format!(
3971 "service deployment not found: {deployment_ref}"
3972 )))
3973 }
3974
3975 fn list_service_deployments(
3976 &self,
3977 request: &AwsRequest,
3978 ) -> Result<AwsResponse, AwsServiceError> {
3979 let body = request.json_body();
3980 let service_ref = req_str(&body, "service")?;
3981 let service_name = service_name_from_ref(service_ref);
3982 let cluster_ref = opt_str(&body, "cluster");
3983 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3984 let accounts = self.state.read();
3985 let mut deployments: Vec<Value> = Vec::new();
3986 if let Some(state) = accounts.get(&request.account_id) {
3987 let key = EcsState::service_key(&cluster_name, &service_name);
3988 if let Some(svc) = state.services.get(&key) {
3989 for d in &svc.deployments {
3990 deployments.push(json!({
3991 "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3992 "serviceArn": svc.service_arn,
3993 "clusterArn": svc.cluster_arn,
3994 "status": d.status,
3995 "createdAt": d.created_at.timestamp(),
3996 "startedAt": d.created_at.timestamp(),
3997 "finishedAt": null,
3998 }));
3999 }
4000 }
4001 }
4002 Ok(AwsResponse::ok_json(json!({
4003 "serviceDeployments": deployments,
4004 })))
4005 }
4006
4007 fn describe_service_deployments(
4008 &self,
4009 request: &AwsRequest,
4010 ) -> Result<AwsResponse, AwsServiceError> {
4011 let body = request.json_body();
4012 let refs: Vec<String> = body
4013 .get("serviceDeploymentArns")
4014 .and_then(|v| v.as_array())
4015 .map(|arr| {
4016 arr.iter()
4017 .filter_map(|v| v.as_str().map(String::from))
4018 .collect()
4019 })
4020 .unwrap_or_default();
4021 let accounts = self.state.read();
4022 let mut found = Vec::new();
4023 let mut failures = Vec::new();
4024 if let Some(state) = accounts.get(&request.account_id) {
4025 'next_ref: for r in &refs {
4026 for svc in state.services.values() {
4027 for d in &svc.deployments {
4028 if r.contains(&d.deployment_id) {
4029 found.push(json!({
4030 "serviceDeploymentArn": r,
4031 "serviceArn": svc.service_arn,
4032 "clusterArn": svc.cluster_arn,
4033 "status": d.status,
4034 "createdAt": d.created_at.timestamp(),
4035 "startedAt": d.created_at.timestamp(),
4036 "finishedAt": null,
4037 "deploymentConfiguration": {
4038 "minimumHealthyPercent": svc.minimum_healthy_percent,
4039 "maximumPercent": svc.maximum_percent,
4040 },
4041 "sourceServiceRevisions": [],
4042 "targetServiceRevision": {
4043 "arn": d.task_definition_arn,
4044 "requestedTaskCount": d.desired_count,
4045 "runningTaskCount": d.running_count,
4046 "pendingTaskCount": d.pending_count,
4047 "failedTasks": d.failed_tasks,
4048 },
4049 }));
4050 continue 'next_ref;
4051 }
4052 }
4053 }
4054 failures.push(json!({"arn": r, "reason": "MISSING"}));
4055 }
4056 }
4057 Ok(AwsResponse::ok_json(json!({
4058 "serviceDeployments": found,
4059 "failures": failures,
4060 })))
4061 }
4062
4063 fn describe_service_revisions(
4064 &self,
4065 request: &AwsRequest,
4066 ) -> Result<AwsResponse, AwsServiceError> {
4067 let body = request.json_body();
4068 let refs: Vec<String> = body
4069 .get("serviceRevisionArns")
4070 .and_then(|v| v.as_array())
4071 .map(|arr| {
4072 arr.iter()
4073 .filter_map(|v| v.as_str().map(String::from))
4074 .collect()
4075 })
4076 .unwrap_or_default();
4077 Ok(AwsResponse::ok_json(json!({
4078 "serviceRevisions": [],
4079 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
4080 })))
4081 }
4082}
4083
4084fn container_instance_id_from_ref(input: &str) -> String {
4085 input.rsplit('/').next().unwrap_or(input).to_string()
4086}
4087
4088fn container_instance_not_found(input: &str) -> AwsServiceError {
4089 AwsServiceError::aws_error(
4090 StatusCode::BAD_REQUEST,
4091 "ClientException",
4092 format!("Container instance not found: {input}"),
4093 )
4094}
4095
4096fn capacity_provider_name_from_ref(input: &str) -> String {
4097 input.rsplit('/').next().unwrap_or(input).to_string()
4098}
4099
4100fn capacity_provider_not_found(name: &str) -> AwsServiceError {
4101 AwsServiceError::aws_error(
4102 StatusCode::BAD_REQUEST,
4103 "ClientException",
4104 format!("Capacity provider not found: {name}"),
4105 )
4106}
4107
4108fn task_set_id_from_ref(input: &str) -> String {
4109 input.rsplit('/').next().unwrap_or(input).to_string()
4110}
4111
4112fn container_instance_to_json(ci: &ContainerInstance) -> Value {
4113 json!({
4114 "containerInstanceArn": ci.container_instance_arn,
4115 "ec2InstanceId": ci.ec2_instance_id,
4116 "status": ci.status,
4117 "version": ci.version,
4118 "versionInfo": ci.version_info,
4119 "agentConnected": ci.agent_connected,
4120 "agentUpdateStatus": ci.agent_update_status,
4121 "remainingResources": ci.remaining_resources,
4122 "registeredResources": ci.registered_resources,
4123 "runningTasksCount": ci.running_tasks_count,
4124 "pendingTasksCount": ci.pending_tasks_count,
4125 "registeredAt": ci.registered_at.timestamp(),
4126 "attributes": ci.attributes.iter().map(|a| json!({
4127 "name": a.name,
4128 "value": a.value,
4129 "targetType": a.target_type,
4130 "targetId": a.target_id,
4131 })).collect::<Vec<_>>(),
4132 "tags": tags_json(&ci.tags),
4133 "capacityProviderName": ci.capacity_provider_name,
4134 "healthStatus": ci.health_status,
4135 })
4136}
4137
4138fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
4139 json!({
4140 "name": cp.name,
4141 "capacityProviderArn": cp.arn,
4142 "status": cp.status,
4143 "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4144 "updateStatus": cp.update_status,
4145 "updateStatusReason": cp.update_status_reason,
4146 "tags": tags_json(&cp.tags),
4147 })
4148}
4149
4150fn task_set_to_json(ts: &TaskSet) -> Value {
4151 json!({
4152 "id": ts.task_set_id,
4153 "taskSetArn": ts.task_set_arn,
4154 "serviceArn": ts.service_arn,
4155 "clusterArn": ts.cluster_arn,
4156 "externalId": ts.external_id,
4157 "status": ts.status,
4158 "taskDefinition": ts.task_definition,
4159 "computedDesiredCount": ts.computed_desired_count,
4160 "pendingCount": ts.pending_count,
4161 "runningCount": ts.running_count,
4162 "launchType": ts.launch_type,
4163 "platformVersion": ts.platform_version,
4164 "scale": ts.scale,
4165 "stabilityStatus": ts.stability_status,
4166 "createdAt": ts.created_at.timestamp(),
4167 "updatedAt": ts.updated_at.timestamp(),
4168 "loadBalancers": ts.load_balancers,
4169 "serviceRegistries": ts.service_registries,
4170 "capacityProviderStrategy": ts.capacity_provider_strategy,
4171 "tags": tags_json(&ts.tags),
4172 })
4173}
4174
4175fn task_protection_json(task: &Task) -> Value {
4176 let p = task.protection.as_ref();
4177 json!({
4178 "taskArn": task.task_arn,
4179 "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4180 "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4181 })
4182}
4183
4184#[cfg(test)]
4185mod tests {
4186 use super::*;
4187
4188 #[test]
4189 fn parse_family_revision_with_revision() {
4190 assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4191 }
4192
4193 #[test]
4194 fn parse_family_revision_without_revision() {
4195 assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4196 }
4197
4198 #[test]
4199 fn parse_family_revision_non_numeric_treated_as_no_revision() {
4200 assert_eq!(
4201 parse_family_revision("web:latest"),
4202 ("web:latest".to_string(), None)
4203 );
4204 }
4205
4206 #[test]
4207 fn decode_ecs_arn_cluster() {
4208 let (account, rtype, tail) =
4209 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4210 assert_eq!(account, "111122223333");
4211 assert_eq!(rtype, "cluster");
4212 assert_eq!(tail, "prod");
4213 }
4214
4215 #[test]
4216 fn decode_ecs_arn_task_definition() {
4217 let (account, rtype, tail) =
4218 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4219 assert_eq!(account, "111122223333");
4220 assert_eq!(rtype, "task-definition");
4221 assert_eq!(tail, "web:5");
4222 }
4223
4224 #[test]
4225 fn decode_ecs_arn_rejects_non_ecs() {
4226 assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4227 }
4228
4229 #[test]
4230 fn resolve_service_key_handles_short_and_long() {
4231 let mut state = EcsState::new("123456789012", "us-east-1");
4232 state.services.insert(
4233 "default/api".to_string(),
4234 Service {
4235 service_name: "api".into(),
4236 service_arn: "arn".into(),
4237 cluster_name: "default".into(),
4238 cluster_arn: "arn".into(),
4239 task_definition_arn: "td-arn".into(),
4240 family: "td".into(),
4241 revision: 1,
4242 desired_count: 0,
4243 running_count: 0,
4244 pending_count: 0,
4245 launch_type: "FARGATE".into(),
4246 status: "ACTIVE".into(),
4247 scheduling_strategy: "REPLICA".into(),
4248 deployment_controller: "ECS".into(),
4249 minimum_healthy_percent: None,
4250 maximum_percent: None,
4251 circuit_breaker: None,
4252 deployments: vec![],
4253 load_balancers: vec![],
4254 service_registries: vec![],
4255 placement_constraints: vec![],
4256 placement_strategy: vec![],
4257 network_configuration: None,
4258 tags: vec![],
4259 created_at: chrono::Utc::now(),
4260 created_by: None,
4261 role_arn: None,
4262 },
4263 );
4264 assert_eq!(
4266 resolve_service_key(&state, "default/api"),
4267 Some("default/api".to_string())
4268 );
4269 assert_eq!(
4271 resolve_service_key(&state, "api"),
4272 Some("default/api".to_string())
4273 );
4274 assert_eq!(resolve_service_key(&state, "nope"), None);
4276 }
4277
4278 #[test]
4279 fn resolve_container_instance_key_handles_short_and_long() {
4280 let mut state = EcsState::new("123456789012", "us-east-1");
4281 state.container_instances.insert(
4282 "default/abc-123".to_string(),
4283 ContainerInstance {
4284 container_instance_arn: "arn".into(),
4285 ec2_instance_id: Some("i-x".into()),
4286 cluster_name: "default".into(),
4287 cluster_arn: "arn".into(),
4288 status: "ACTIVE".into(),
4289 version: 0,
4290 version_info: None,
4291 agent_connected: true,
4292 agent_update_status: None,
4293 remaining_resources: vec![],
4294 registered_resources: vec![],
4295 running_tasks_count: 0,
4296 pending_tasks_count: 0,
4297 registered_at: chrono::Utc::now(),
4298 attributes: vec![],
4299 tags: vec![],
4300 capacity_provider_name: None,
4301 health_status: None,
4302 },
4303 );
4304 assert_eq!(
4305 resolve_container_instance_key(&state, "default/abc-123"),
4306 Some("default/abc-123".to_string())
4307 );
4308 assert_eq!(
4309 resolve_container_instance_key(&state, "abc-123"),
4310 Some("default/abc-123".to_string())
4311 );
4312 assert_eq!(resolve_container_instance_key(&state, "nope"), None);
4313 }
4314
4315 #[test]
4316 fn validate_family_name_accepts_hyphen_underscore() {
4317 assert!(validate_family_name("web_server-2").is_ok());
4318 }
4319
4320 #[test]
4321 fn validate_family_name_rejects_empty() {
4322 assert!(validate_family_name("").is_err());
4323 }
4324
4325 #[test]
4326 fn validate_family_name_rejects_slash() {
4327 assert!(validate_family_name("web/server").is_err());
4328 }
4329
4330 #[test]
4331 fn resolve_task_definition_ref_bare_family() {
4332 let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4333 assert_eq!(account, None);
4334 assert_eq!(family, "web");
4335 assert_eq!(rev, None);
4336 }
4337
4338 #[test]
4339 fn resolve_task_definition_ref_family_revision() {
4340 let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4341 assert_eq!(account, None);
4342 assert_eq!(family, "web");
4343 assert_eq!(rev, Some(3));
4344 }
4345
4346 #[test]
4347 fn resolve_task_definition_ref_full_arn() {
4348 let (account, family, rev) =
4349 resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4350 .unwrap();
4351 assert_eq!(account, Some("111122223333".to_string()));
4352 assert_eq!(family, "web");
4353 assert_eq!(rev, Some(3));
4354 }
4355
4356 #[test]
4357 fn merge_tags_replaces_existing_value() {
4358 let mut current = vec![TagEntry {
4359 key: "env".into(),
4360 value: "dev".into(),
4361 }];
4362 merge_tags(
4363 &mut current,
4364 vec![TagEntry {
4365 key: "env".into(),
4366 value: "prod".into(),
4367 }],
4368 );
4369 assert_eq!(current.len(), 1);
4370 assert_eq!(current[0].value, "prod");
4371 }
4372
4373 #[test]
4374 fn merge_tags_adds_new() {
4375 let mut current = vec![TagEntry {
4376 key: "env".into(),
4377 value: "dev".into(),
4378 }];
4379 merge_tags(
4380 &mut current,
4381 vec![TagEntry {
4382 key: "team".into(),
4383 value: "platform".into(),
4384 }],
4385 );
4386 assert_eq!(current.len(), 2);
4387 }
4388
4389 #[test]
4390 fn parse_tags_reads_lowercase_keys() {
4391 let body = json!({
4392 "tags": [
4393 {"key": "env", "value": "prod"},
4394 {"key": "team", "value": "platform"},
4395 ]
4396 });
4397 let tags = parse_tags(&body);
4398 assert_eq!(tags.len(), 2);
4399 assert_eq!(tags[0].key, "env");
4400 assert_eq!(tags[0].value, "prod");
4401 }
4402
4403 #[test]
4404 fn matches_filter_respects_none() {
4405 assert!(matches_filter(None, "anything"));
4406 assert!(matches_filter(Some("x"), "x"));
4407 assert!(!matches_filter(Some("x"), "y"));
4408 }
4409}