1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
10use fakecloud_persistence::SnapshotStore;
11
12use crate::state::{
13 Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
14 Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
15 TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
16};
17
18const SUPPORTED_ACTIONS: &[&str] = &[
19 "CreateCluster",
20 "DescribeClusters",
21 "DeleteCluster",
22 "ListClusters",
23 "UpdateCluster",
24 "UpdateClusterSettings",
25 "PutClusterCapacityProviders",
26 "RegisterTaskDefinition",
27 "DescribeTaskDefinition",
28 "DeregisterTaskDefinition",
29 "DeleteTaskDefinitions",
30 "ListTaskDefinitions",
31 "ListTaskDefinitionFamilies",
32 "TagResource",
33 "UntagResource",
34 "ListTagsForResource",
35 "PutAccountSetting",
36 "PutAccountSettingDefault",
37 "DeleteAccountSetting",
38 "ListAccountSettings",
39 "RunTask",
40 "StartTask",
41 "StopTask",
42 "DescribeTasks",
43 "ListTasks",
44 "CreateService",
45 "UpdateService",
46 "DeleteService",
47 "DescribeServices",
48 "ListServices",
49 "ListServicesByNamespace",
50 "RegisterContainerInstance",
51 "DeregisterContainerInstance",
52 "DescribeContainerInstances",
53 "ListContainerInstances",
54 "UpdateContainerAgent",
55 "UpdateContainerInstancesState",
56 "PutAttributes",
57 "DeleteAttributes",
58 "ListAttributes",
59 "CreateCapacityProvider",
60 "DeleteCapacityProvider",
61 "DescribeCapacityProviders",
62 "UpdateCapacityProvider",
63 "GetTaskProtection",
64 "UpdateTaskProtection",
65 "CreateTaskSet",
66 "UpdateTaskSet",
67 "DeleteTaskSet",
68 "DescribeTaskSets",
69 "UpdateServicePrimaryTaskSet",
70 "ExecuteCommand",
71 "SubmitContainerStateChange",
72 "SubmitTaskStateChange",
73 "SubmitAttachmentStateChanges",
74 "DiscoverPollEndpoint",
75 "StopServiceDeployment",
76 "ListServiceDeployments",
77 "DescribeServiceDeployments",
78 "DescribeServiceRevisions",
79];
80
81fn is_mutating(action: &str) -> bool {
82 matches!(
83 action,
84 "CreateCluster"
85 | "DeleteCluster"
86 | "UpdateCluster"
87 | "UpdateClusterSettings"
88 | "PutClusterCapacityProviders"
89 | "RegisterTaskDefinition"
90 | "DeregisterTaskDefinition"
91 | "DeleteTaskDefinitions"
92 | "TagResource"
93 | "UntagResource"
94 | "PutAccountSetting"
95 | "PutAccountSettingDefault"
96 | "DeleteAccountSetting"
97 | "RunTask"
98 | "StartTask"
99 | "StopTask"
100 | "CreateService"
101 | "UpdateService"
102 | "DeleteService"
103 | "RegisterContainerInstance"
104 | "DeregisterContainerInstance"
105 | "UpdateContainerAgent"
106 | "UpdateContainerInstancesState"
107 | "PutAttributes"
108 | "DeleteAttributes"
109 | "CreateCapacityProvider"
110 | "DeleteCapacityProvider"
111 | "UpdateCapacityProvider"
112 | "UpdateTaskProtection"
113 | "CreateTaskSet"
114 | "UpdateTaskSet"
115 | "DeleteTaskSet"
116 | "UpdateServicePrimaryTaskSet"
117 | "SubmitContainerStateChange"
118 | "SubmitTaskStateChange"
119 | "SubmitAttachmentStateChanges"
120 | "StopServiceDeployment"
121 )
122}
123
124pub struct EcsService {
125 state: SharedEcsState,
126 snapshot_store: Option<Arc<dyn SnapshotStore>>,
127 snapshot_lock: Arc<AsyncMutex<()>>,
128 runtime: Option<Arc<crate::runtime::EcsRuntime>>,
129 role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
130}
131
132impl EcsService {
133 pub fn new(state: SharedEcsState) -> Self {
134 Self {
135 state,
136 snapshot_store: None,
137 snapshot_lock: Arc::new(AsyncMutex::new(())),
138 runtime: None,
139 role_trust_validator: None,
140 }
141 }
142
143 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
144 self.snapshot_store = Some(store);
145 self
146 }
147
148 pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
149 self.runtime = Some(runtime);
150 self
151 }
152
153 pub fn with_role_trust_validator(
154 mut self,
155 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
156 ) -> Self {
157 self.role_trust_validator = Some(validator);
158 self
159 }
160
161 fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
162 let Some(ref validator) = self.role_trust_validator else {
163 return Ok(());
164 };
165 if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
166 return Err(AwsServiceError::aws_error(
167 StatusCode::BAD_REQUEST,
168 "InvalidParameterException",
169 err.to_string(),
170 ));
171 }
172 Ok(())
173 }
174
175 pub fn state_handle(&self) -> &SharedEcsState {
176 &self.state
177 }
178
179 async fn save_snapshot(&self) {
180 let Some(store) = self.snapshot_store.clone() else {
181 return;
182 };
183 let _guard = self.snapshot_lock.lock().await;
184 let snapshot = EcsSnapshot {
185 schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
186 accounts: Some(self.state.read().clone()),
187 };
188 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
189 let bytes = serde_json::to_vec(&snapshot)
190 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
191 store.save(&bytes)
192 })
193 .await;
194 match join {
195 Ok(Ok(())) => {}
196 Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
197 Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
198 }
199 }
200}
201
202#[async_trait]
203impl AwsService for EcsService {
204 fn service_name(&self) -> &str {
205 "ecs"
206 }
207
208 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
209 let mutates = is_mutating(request.action.as_str());
210 let result = match request.action.as_str() {
211 "CreateCluster" => self.create_cluster(&request),
212 "DescribeClusters" => self.describe_clusters(&request),
213 "DeleteCluster" => self.delete_cluster(&request),
214 "ListClusters" => self.list_clusters(&request),
215 "UpdateCluster" => self.update_cluster(&request),
216 "UpdateClusterSettings" => self.update_cluster_settings(&request),
217 "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
218 "RegisterTaskDefinition" => self.register_task_definition(&request),
219 "DescribeTaskDefinition" => self.describe_task_definition(&request),
220 "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
221 "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
222 "ListTaskDefinitions" => self.list_task_definitions(&request),
223 "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
224 "TagResource" => self.tag_resource(&request),
225 "UntagResource" => self.untag_resource(&request),
226 "ListTagsForResource" => self.list_tags_for_resource(&request),
227 "PutAccountSetting" => self.put_account_setting(&request),
228 "PutAccountSettingDefault" => self.put_account_setting_default(&request),
229 "DeleteAccountSetting" => self.delete_account_setting(&request),
230 "ListAccountSettings" => self.list_account_settings(&request),
231 "RunTask" => self.run_task(&request),
232 "StartTask" => self.start_task(&request),
233 "StopTask" => self.stop_task(&request).await,
234 "DescribeTasks" => self.describe_tasks(&request),
235 "ListTasks" => self.list_tasks(&request),
236 "CreateService" => self.create_service(&request),
237 "UpdateService" => self.update_service(&request),
238 "DeleteService" => self.delete_service(&request).await,
239 "DescribeServices" => self.describe_services(&request),
240 "ListServices" => self.list_services(&request),
241 "ListServicesByNamespace" => self.list_services_by_namespace(&request),
242 "RegisterContainerInstance" => self.register_container_instance(&request),
243 "DeregisterContainerInstance" => self.deregister_container_instance(&request),
244 "DescribeContainerInstances" => self.describe_container_instances(&request),
245 "ListContainerInstances" => self.list_container_instances(&request),
246 "UpdateContainerAgent" => self.update_container_agent(&request),
247 "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
248 "PutAttributes" => self.put_attributes(&request),
249 "DeleteAttributes" => self.delete_attributes(&request),
250 "ListAttributes" => self.list_attributes(&request),
251 "CreateCapacityProvider" => self.create_capacity_provider(&request),
252 "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
253 "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
254 "UpdateCapacityProvider" => self.update_capacity_provider(&request),
255 "GetTaskProtection" => self.get_task_protection(&request),
256 "UpdateTaskProtection" => self.update_task_protection(&request),
257 "CreateTaskSet" => self.create_task_set(&request),
258 "UpdateTaskSet" => self.update_task_set(&request),
259 "DeleteTaskSet" => self.delete_task_set(&request),
260 "DescribeTaskSets" => self.describe_task_sets(&request),
261 "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
262 "ExecuteCommand" => self.execute_command(&request).await,
263 "SubmitContainerStateChange" => self.submit_container_state_change(&request),
264 "SubmitTaskStateChange" => self.submit_task_state_change(&request),
265 "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
266 "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
267 "StopServiceDeployment" => self.stop_service_deployment(&request),
268 "ListServiceDeployments" => self.list_service_deployments(&request),
269 "DescribeServiceDeployments" => self.describe_service_deployments(&request),
270 "DescribeServiceRevisions" => self.describe_service_revisions(&request),
271 _ => Err(AwsServiceError::action_not_implemented(
272 "ecs",
273 &request.action,
274 )),
275 };
276 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
277 self.save_snapshot().await;
278 }
279 result
280 }
281
282 fn supported_actions(&self) -> &[&str] {
283 SUPPORTED_ACTIONS
284 }
285}
286
287fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
290 body.get(field)
291 .and_then(|v| v.as_str())
292 .ok_or_else(|| client_exception(format!("Missing required field: {field}")))
293}
294
295fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
296 body.get(field).and_then(|v| v.as_str())
297}
298
299fn client_exception(message: impl Into<String>) -> AwsServiceError {
300 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "ClientException", message)
301}
302
303fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
304 AwsServiceError::aws_error(
305 StatusCode::BAD_REQUEST,
306 "InvalidParameterException",
307 message,
308 )
309}
310
311fn cluster_not_found(name: &str) -> AwsServiceError {
312 AwsServiceError::aws_error(
313 StatusCode::BAD_REQUEST,
314 "ClusterNotFoundException",
315 format!("The referenced cluster was inactive: {name}"),
316 )
317}
318
319fn cluster_contains_services() -> AwsServiceError {
320 AwsServiceError::aws_error(
321 StatusCode::BAD_REQUEST,
322 "ClusterContainsServicesException",
323 "The specified cluster still contains active services",
324 )
325}
326
327fn cluster_contains_tasks() -> AwsServiceError {
328 AwsServiceError::aws_error(
329 StatusCode::BAD_REQUEST,
330 "ClusterContainsTasksException",
331 "The specified cluster still contains active tasks",
332 )
333}
334
335fn task_definition_not_found(family_rev: &str) -> AwsServiceError {
336 AwsServiceError::aws_error(
337 StatusCode::BAD_REQUEST,
338 "ClientException",
339 format!("Unable to describe task definition: {family_rev}"),
340 )
341}
342
343fn parse_tags(body: &Value) -> Vec<TagEntry> {
344 body.get("tags")
345 .and_then(|v| v.as_array())
346 .map(|arr| {
347 arr.iter()
348 .filter_map(|t| {
349 let k = t.get("key").and_then(|v| v.as_str())?;
350 let v = t.get("value").and_then(|v| v.as_str()).unwrap_or("");
351 Some(TagEntry {
352 key: k.to_string(),
353 value: v.to_string(),
354 })
355 })
356 .collect()
357 })
358 .unwrap_or_default()
359}
360
361fn tags_json(tags: &[TagEntry]) -> Value {
362 Value::Array(
363 tags.iter()
364 .map(|t| json!({"key": t.key, "value": t.value}))
365 .collect(),
366 )
367}
368
369fn merge_tags(current: &mut Vec<TagEntry>, incoming: Vec<TagEntry>) {
370 for new_tag in incoming {
371 if let Some(existing) = current.iter_mut().find(|t| t.key == new_tag.key) {
372 existing.value = new_tag.value;
373 } else {
374 current.push(new_tag);
375 }
376 }
377}
378
379fn cluster_to_json(cluster: &Cluster) -> Value {
380 json!({
381 "clusterArn": cluster.cluster_arn,
382 "clusterName": cluster.cluster_name,
383 "status": cluster.status,
384 "registeredContainerInstancesCount": cluster.registered_container_instances_count,
385 "runningTasksCount": cluster.running_tasks_count,
386 "pendingTasksCount": cluster.pending_tasks_count,
387 "activeServicesCount": cluster.active_services_count,
388 "statistics": cluster.statistics,
389 "tags": tags_json(&cluster.tags),
390 "settings": cluster.settings,
391 "configuration": cluster.configuration,
392 "capacityProviders": cluster.capacity_providers,
393 "defaultCapacityProviderStrategy": cluster.default_capacity_provider_strategy,
394 "attachments": cluster.attachments,
395 "attachmentsStatus": cluster.attachments_status,
396 "serviceConnectDefaults": cluster.service_connect_defaults,
397 })
398}
399
400fn task_definition_to_json(td: &TaskDefinition) -> Value {
401 let mut map = Map::new();
402 map.insert("taskDefinitionArn".into(), json!(td.task_definition_arn));
403 map.insert("family".into(), json!(td.family));
404 map.insert("revision".into(), json!(td.revision));
405 map.insert("status".into(), json!(td.status));
406 map.insert(
407 "containerDefinitions".into(),
408 Value::Array(td.container_definitions.clone()),
409 );
410 map.insert("compatibilities".into(), json!(td.compatibilities));
411 map.insert(
412 "requiresCompatibilities".into(),
413 json!(td.requires_compatibilities),
414 );
415 map.insert("volumes".into(), Value::Array(td.volumes.clone()));
416 map.insert(
417 "placementConstraints".into(),
418 Value::Array(td.placement_constraints.clone()),
419 );
420 map.insert(
421 "requiresAttributes".into(),
422 Value::Array(td.requires_attributes.clone()),
423 );
424 map.insert(
425 "inferenceAccelerators".into(),
426 Value::Array(td.inference_accelerators.clone()),
427 );
428 if let Some(ref x) = td.network_mode {
429 map.insert("networkMode".into(), json!(x));
430 }
431 if let Some(ref x) = td.cpu {
432 map.insert("cpu".into(), json!(x));
433 }
434 if let Some(ref x) = td.memory {
435 map.insert("memory".into(), json!(x));
436 }
437 if let Some(ref x) = td.task_role_arn {
438 map.insert("taskRoleArn".into(), json!(x));
439 }
440 if let Some(ref x) = td.execution_role_arn {
441 map.insert("executionRoleArn".into(), json!(x));
442 }
443 if let Some(ref x) = td.pid_mode {
444 map.insert("pidMode".into(), json!(x));
445 }
446 if let Some(ref x) = td.ipc_mode {
447 map.insert("ipcMode".into(), json!(x));
448 }
449 if let Some(ref x) = td.proxy_configuration {
450 map.insert("proxyConfiguration".into(), x.clone());
451 }
452 if let Some(ref x) = td.ephemeral_storage {
453 map.insert("ephemeralStorage".into(), x.clone());
454 }
455 if let Some(ref x) = td.runtime_platform {
456 map.insert("runtimePlatform".into(), x.clone());
457 }
458 if let Some(ref x) = td.registered_by {
459 map.insert("registeredBy".into(), json!(x));
460 }
461 map.insert("registeredAt".into(), json!(td.registered_at.timestamp()));
462 if let Some(ts) = td.deregistered_at {
463 map.insert("deregisteredAt".into(), json!(ts.timestamp()));
464 }
465 if let Some(enabled) = td.enable_fault_injection {
466 map.insert("enableFaultInjection".into(), json!(enabled));
467 }
468 Value::Object(map)
469}
470
471fn decode_ecs_arn(arn: &str) -> Result<(String, String, String), AwsServiceError> {
475 let rest = arn
476 .strip_prefix("arn:aws:ecs:")
477 .ok_or_else(|| invalid_parameter(format!("Malformed ECS ARN: {arn}")))?;
478 let mut parts = rest.splitn(3, ':');
481 let _region = parts
482 .next()
483 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
484 let account = parts
485 .next()
486 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
487 let resource = parts
488 .next()
489 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
490 let (resource_type, tail) = resource
491 .split_once('/')
492 .ok_or_else(|| invalid_parameter("Malformed ECS ARN"))?;
493 Ok((
494 account.to_string(),
495 resource_type.to_string(),
496 tail.to_string(),
497 ))
498}
499
500fn parse_family_revision(input: &str) -> (String, Option<i32>) {
504 if let Some((family, rev)) = input.rsplit_once(':') {
505 if let Ok(n) = rev.parse::<i32>() {
506 return (family.to_string(), Some(n));
507 }
508 }
509 (input.to_string(), None)
510}
511
512fn resolve_task_definition_ref(
516 input: &str,
517) -> Result<(Option<String>, String, Option<i32>), AwsServiceError> {
518 if input.starts_with("arn:aws:ecs:") {
519 let (account, resource_type, tail) = decode_ecs_arn(input)?;
520 if resource_type != "task-definition" {
521 return Err(invalid_parameter(format!(
522 "Expected task-definition ARN: {input}"
523 )));
524 }
525 let (family, rev) = parse_family_revision(&tail);
526 Ok((Some(account), family, rev))
527 } else {
528 let (family, rev) = parse_family_revision(input);
529 Ok((None, family, rev))
530 }
531}
532
533fn target_account_for_task_definition(request: &AwsRequest, td_ref: &str) -> String {
534 if let Ok((Some(account), _, _)) = resolve_task_definition_ref(td_ref) {
535 account
536 } else {
537 request.account_id.clone()
538 }
539}
540
541fn target_account_for_cluster(request: &AwsRequest, cluster_ref: Option<&str>) -> String {
542 if let Some(input) = cluster_ref {
543 if input.starts_with("arn:aws:ecs:") {
544 if let Ok((account, _, _)) = decode_ecs_arn(input) {
545 return account;
546 }
547 }
548 }
549 request.account_id.clone()
550}
551
552fn latest_active_revision(
553 revisions: &std::collections::BTreeMap<i32, TaskDefinition>,
554) -> Option<&TaskDefinition> {
555 revisions.values().rev().find(|td| td.status == "ACTIVE")
556}
557
558impl EcsService {
561 fn create_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
562 let body = request.json_body();
563 let cluster_name = opt_str(&body, "clusterName")
564 .unwrap_or("default")
565 .to_string();
566 let tags = parse_tags(&body);
567 let settings: Vec<Value> = body
568 .get("settings")
569 .and_then(|v| v.as_array())
570 .cloned()
571 .unwrap_or_default();
572 let configuration = body.get("configuration").cloned();
573 let capacity_providers: Vec<String> = body
574 .get("capacityProviders")
575 .and_then(|v| v.as_array())
576 .map(|arr| {
577 arr.iter()
578 .filter_map(|v| v.as_str().map(String::from))
579 .collect()
580 })
581 .unwrap_or_default();
582 let default_strategy: Vec<Value> = body
583 .get("defaultCapacityProviderStrategy")
584 .and_then(|v| v.as_array())
585 .cloned()
586 .unwrap_or_default();
587 let service_connect = body.get("serviceConnectDefaults").cloned();
588
589 let account = request.account_id.clone();
590 let mut accounts = self.state.write();
591 let state = accounts.get_or_create(&account);
592 let arn = state.cluster_arn(&cluster_name);
593 let mut cluster = Cluster::new(&cluster_name, arn);
594 cluster.tags = tags;
595 cluster.settings = settings;
596 cluster.configuration = configuration;
597 cluster.capacity_providers = capacity_providers;
598 cluster.default_capacity_provider_strategy = default_strategy;
599 cluster.service_connect_defaults = service_connect;
600 state.clusters.insert(cluster_name.clone(), cluster.clone());
604
605 Ok(AwsResponse::ok_json(json!({
606 "cluster": cluster_to_json(&cluster),
607 })))
608 }
609
610 fn describe_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
611 let body = request.json_body();
612 let names: Vec<String> = body
613 .get("clusters")
614 .and_then(|v| v.as_array())
615 .map(|arr| {
616 arr.iter()
617 .filter_map(|v| v.as_str().map(|s| EcsState::resolve_cluster_name(Some(s))))
618 .collect()
619 })
620 .unwrap_or_else(|| vec!["default".to_string()]);
621
622 let account = request.account_id.clone();
623 let accounts = self.state.read();
624 let mut found = Vec::new();
625 let mut failures = Vec::new();
626 if let Some(state) = accounts.get(&account) {
627 for name in &names {
628 match state.clusters.get(name) {
629 Some(c) => found.push(cluster_to_json(c)),
630 None => failures.push(json!({
631 "arn": state.cluster_arn(name),
632 "reason": "MISSING",
633 })),
634 }
635 }
636 } else {
637 for name in &names {
638 failures.push(json!({
639 "arn": format!(
640 "arn:aws:ecs:{}:{}:cluster/{}",
641 accounts.region(),
642 account,
643 name
644 ),
645 "reason": "MISSING",
646 }));
647 }
648 }
649 Ok(AwsResponse::ok_json(json!({
650 "clusters": found,
651 "failures": failures,
652 })))
653 }
654
655 fn delete_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
656 let body = request.json_body();
657 let cluster_ref = opt_str(&body, "cluster");
658 let name = EcsState::resolve_cluster_name(cluster_ref);
659 let account = target_account_for_cluster(request, cluster_ref);
660
661 let mut accounts = self.state.write();
662 let state = accounts.get_or_create(&account);
663 let cluster = state
664 .clusters
665 .get_mut(&name)
666 .ok_or_else(|| cluster_not_found(&name))?;
667 if cluster.active_services_count > 0 {
668 return Err(cluster_contains_services());
669 }
670 if cluster.running_tasks_count > 0 || cluster.pending_tasks_count > 0 {
671 return Err(cluster_contains_tasks());
672 }
673 cluster.status = "INACTIVE".to_string();
674 let snapshot = cluster.clone();
675 state.clusters.remove(&name);
680 Ok(AwsResponse::ok_json(json!({
681 "cluster": cluster_to_json(&snapshot),
682 })))
683 }
684
685 fn list_clusters(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
686 let body = request.json_body();
687 let max_results = body
688 .get("maxResults")
689 .and_then(|v| v.as_i64())
690 .filter(|n| (1..=100).contains(n))
691 .map(|n| n as usize)
692 .unwrap_or(100);
693 let next_token = opt_str(&body, "nextToken").unwrap_or("");
694
695 let account = request.account_id.clone();
696 let accounts = self.state.read();
697 let arns: Vec<String> = match accounts.get(&account) {
698 Some(state) => state
699 .clusters
700 .values()
701 .map(|c| c.cluster_arn.clone())
702 .collect(),
703 None => Vec::new(),
704 };
705 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
706 let end = (start + max_results).min(arns.len());
707 let page = arns[start..end].to_vec();
708 let next = if end < arns.len() {
709 Some(end.to_string())
710 } else {
711 None
712 };
713 let mut out = json!({ "clusterArns": page });
714 if let Some(n) = next {
715 out.as_object_mut()
716 .unwrap()
717 .insert("nextToken".into(), json!(n));
718 }
719 Ok(AwsResponse::ok_json(out))
720 }
721
722 fn update_cluster(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
723 let body = request.json_body();
724 let cluster_ref = req_str(&body, "cluster")?;
725 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
726 let account = target_account_for_cluster(request, Some(cluster_ref));
727
728 let mut accounts = self.state.write();
729 let state = accounts.get_or_create(&account);
730 let cluster = state
731 .clusters
732 .get_mut(&name)
733 .ok_or_else(|| cluster_not_found(&name))?;
734 if let Some(settings) = body.get("settings").and_then(|v| v.as_array()) {
735 cluster.settings = settings.clone();
736 }
737 if let Some(cfg) = body.get("configuration") {
738 cluster.configuration = Some(cfg.clone());
739 }
740 if let Some(sc) = body.get("serviceConnectDefaults") {
741 cluster.service_connect_defaults = Some(sc.clone());
742 }
743 let snapshot = cluster.clone();
744 Ok(AwsResponse::ok_json(json!({
745 "cluster": cluster_to_json(&snapshot),
746 })))
747 }
748
749 fn update_cluster_settings(
750 &self,
751 request: &AwsRequest,
752 ) -> Result<AwsResponse, AwsServiceError> {
753 let body = request.json_body();
754 let cluster_ref = req_str(&body, "cluster")?;
755 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
756 let account = target_account_for_cluster(request, Some(cluster_ref));
757 let settings: Vec<Value> = body
758 .get("settings")
759 .and_then(|v| v.as_array())
760 .cloned()
761 .unwrap_or_default();
762
763 let mut accounts = self.state.write();
764 let state = accounts.get_or_create(&account);
765 let cluster = state
766 .clusters
767 .get_mut(&name)
768 .ok_or_else(|| cluster_not_found(&name))?;
769 cluster.settings = settings;
770 let snapshot = cluster.clone();
771 Ok(AwsResponse::ok_json(json!({
772 "cluster": cluster_to_json(&snapshot),
773 })))
774 }
775
776 fn put_cluster_capacity_providers(
777 &self,
778 request: &AwsRequest,
779 ) -> Result<AwsResponse, AwsServiceError> {
780 let body = request.json_body();
781 let cluster_ref = req_str(&body, "cluster")?;
782 let name = EcsState::resolve_cluster_name(Some(cluster_ref));
783 let account = target_account_for_cluster(request, Some(cluster_ref));
784 let capacity_providers: Vec<String> = body
785 .get("capacityProviders")
786 .and_then(|v| v.as_array())
787 .map(|arr| {
788 arr.iter()
789 .filter_map(|v| v.as_str().map(String::from))
790 .collect()
791 })
792 .ok_or_else(|| client_exception("Missing required field: capacityProviders"))?;
793 let default_strategy: Vec<Value> = body
794 .get("defaultCapacityProviderStrategy")
795 .and_then(|v| v.as_array())
796 .cloned()
797 .ok_or_else(|| {
798 client_exception("Missing required field: defaultCapacityProviderStrategy")
799 })?;
800
801 let mut accounts = self.state.write();
802 let state = accounts.get_or_create(&account);
803 let cluster = state
804 .clusters
805 .get_mut(&name)
806 .ok_or_else(|| cluster_not_found(&name))?;
807 cluster.capacity_providers = capacity_providers;
808 cluster.default_capacity_provider_strategy = default_strategy;
809 let snapshot = cluster.clone();
810 Ok(AwsResponse::ok_json(json!({
811 "cluster": cluster_to_json(&snapshot),
812 })))
813 }
814}
815
816impl EcsService {
819 fn register_task_definition(
820 &self,
821 request: &AwsRequest,
822 ) -> Result<AwsResponse, AwsServiceError> {
823 let body = request.json_body();
824 let family = req_str(&body, "family")?.to_string();
825 validate_family_name(&family)?;
826 let container_definitions = body
827 .get("containerDefinitions")
828 .and_then(|v| v.as_array())
829 .cloned()
830 .ok_or_else(|| client_exception("Missing required field: containerDefinitions"))?;
831 if container_definitions.is_empty() {
832 return Err(client_exception(
833 "Task definition must have at least one container",
834 ));
835 }
836 for cd in &container_definitions {
837 if cd
838 .get("name")
839 .and_then(|v| v.as_str())
840 .unwrap_or("")
841 .is_empty()
842 {
843 return Err(client_exception(
844 "Container definition is missing required field: name",
845 ));
846 }
847 if cd
848 .get("image")
849 .and_then(|v| v.as_str())
850 .unwrap_or("")
851 .is_empty()
852 {
853 return Err(client_exception(
854 "Container definition is missing required field: image",
855 ));
856 }
857 }
858 if let Some(role_arn) = opt_str(&body, "taskRoleArn") {
862 self.check_pass_role(&request.account_id, role_arn)?;
863 }
864 if let Some(role_arn) = opt_str(&body, "executionRoleArn") {
865 self.check_pass_role(&request.account_id, role_arn)?;
866 }
867
868 let tags = parse_tags(&body);
869 let requires_compatibilities: Vec<String> = body
870 .get("requiresCompatibilities")
871 .and_then(|v| v.as_array())
872 .map(|arr| {
873 arr.iter()
874 .filter_map(|v| v.as_str().map(String::from))
875 .collect()
876 })
877 .unwrap_or_default();
878 let compatibilities = vec!["EC2".to_string(), "FARGATE".to_string()];
882
883 let account = request.account_id.clone();
884 let mut accounts = self.state.write();
885 let state = accounts.get_or_create(&account);
886 let revision = state.allocate_revision(&family);
887 let arn = state.task_definition_arn(&family, revision);
888 let td = TaskDefinition {
889 family: family.clone(),
890 revision,
891 task_definition_arn: arn,
892 container_definitions,
893 status: "ACTIVE".to_string(),
894 task_role_arn: opt_str(&body, "taskRoleArn").map(String::from),
895 execution_role_arn: opt_str(&body, "executionRoleArn").map(String::from),
896 network_mode: opt_str(&body, "networkMode").map(String::from),
897 requires_compatibilities,
898 compatibilities,
899 cpu: opt_str(&body, "cpu").map(String::from),
900 memory: opt_str(&body, "memory").map(String::from),
901 pid_mode: opt_str(&body, "pidMode").map(String::from),
902 ipc_mode: opt_str(&body, "ipcMode").map(String::from),
903 volumes: body
904 .get("volumes")
905 .and_then(|v| v.as_array())
906 .cloned()
907 .unwrap_or_default(),
908 placement_constraints: body
909 .get("placementConstraints")
910 .and_then(|v| v.as_array())
911 .cloned()
912 .unwrap_or_default(),
913 proxy_configuration: body.get("proxyConfiguration").cloned(),
914 inference_accelerators: body
915 .get("inferenceAccelerators")
916 .and_then(|v| v.as_array())
917 .cloned()
918 .unwrap_or_default(),
919 ephemeral_storage: body.get("ephemeralStorage").cloned(),
920 runtime_platform: body.get("runtimePlatform").cloned(),
921 requires_attributes: Vec::new(),
922 registered_at: Utc::now(),
923 registered_by: request
924 .principal
925 .as_ref()
926 .map(|p| p.arn.clone())
927 .or(Some(format!("arn:aws:iam::{}:root", state.account_id))),
928 deregistered_at: None,
929 tags: tags.clone(),
930 enable_fault_injection: body.get("enableFaultInjection").and_then(|v| v.as_bool()),
931 };
932 let td_json = task_definition_to_json(&td);
933 state
934 .task_definitions
935 .entry(family.clone())
936 .or_default()
937 .insert(revision, td);
938
939 Ok(AwsResponse::ok_json(json!({
940 "taskDefinition": td_json,
941 "tags": tags_json(&tags),
942 })))
943 }
944
945 fn describe_task_definition(
946 &self,
947 request: &AwsRequest,
948 ) -> Result<AwsResponse, AwsServiceError> {
949 let body = request.json_body();
950 let td_ref = req_str(&body, "taskDefinition")?;
951 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
952 let include_tags = body
953 .get("include")
954 .and_then(|v| v.as_array())
955 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
956 .unwrap_or(false);
957
958 let account = target_account_for_task_definition(request, td_ref);
959 let accounts = self.state.read();
960 let state = accounts
961 .get(&account)
962 .ok_or_else(|| task_definition_not_found(td_ref))?;
963 let revisions = state
964 .task_definitions
965 .get(&family)
966 .ok_or_else(|| task_definition_not_found(td_ref))?;
967 let td = match rev {
968 Some(n) => revisions
969 .get(&n)
970 .ok_or_else(|| task_definition_not_found(td_ref))?,
971 None => latest_active_revision(revisions)
972 .ok_or_else(|| task_definition_not_found(td_ref))?,
973 };
974 let mut out = json!({"taskDefinition": task_definition_to_json(td)});
975 if include_tags {
976 out.as_object_mut()
977 .unwrap()
978 .insert("tags".into(), tags_json(&td.tags));
979 }
980 Ok(AwsResponse::ok_json(out))
981 }
982
983 fn deregister_task_definition(
984 &self,
985 request: &AwsRequest,
986 ) -> Result<AwsResponse, AwsServiceError> {
987 let body = request.json_body();
988 let td_ref = req_str(&body, "taskDefinition")?;
989 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
990 let rev =
991 rev.ok_or_else(|| client_exception("taskDefinition must reference a revision"))?;
992
993 let account = target_account_for_task_definition(request, td_ref);
994 let mut accounts = self.state.write();
995 let state = accounts.get_or_create(&account);
996 let revisions = state
997 .task_definitions
998 .get_mut(&family)
999 .ok_or_else(|| task_definition_not_found(td_ref))?;
1000 let td = revisions
1001 .get_mut(&rev)
1002 .ok_or_else(|| task_definition_not_found(td_ref))?;
1003 td.status = "INACTIVE".to_string();
1004 td.deregistered_at = Some(Utc::now());
1005 let snapshot = td.clone();
1006 Ok(AwsResponse::ok_json(json!({
1007 "taskDefinition": task_definition_to_json(&snapshot),
1008 })))
1009 }
1010
1011 fn delete_task_definitions(
1012 &self,
1013 request: &AwsRequest,
1014 ) -> Result<AwsResponse, AwsServiceError> {
1015 let body = request.json_body();
1016 let refs: Vec<String> = body
1017 .get("taskDefinitions")
1018 .and_then(|v| v.as_array())
1019 .map(|arr| {
1020 arr.iter()
1021 .filter_map(|v| v.as_str().map(String::from))
1022 .collect()
1023 })
1024 .ok_or_else(|| client_exception("Missing required field: taskDefinitions"))?;
1025 if refs.is_empty() {
1026 return Err(client_exception("taskDefinitions must not be empty"));
1027 }
1028
1029 let mut deleted = Vec::new();
1030 let mut failures = Vec::new();
1031 let account = request.account_id.clone();
1032 let mut accounts = self.state.write();
1033 let state = accounts.get_or_create(&account);
1034 for input in refs {
1035 let parsed = match resolve_task_definition_ref(&input) {
1036 Ok((_, family, Some(rev))) => Some((family, rev)),
1037 Ok(_) => None,
1038 Err(_) => None,
1039 };
1040 let Some((family, rev)) = parsed else {
1041 failures.push(json!({
1042 "arn": input,
1043 "reason": "INVALID_REFERENCE",
1044 "detail": "Expected family:revision or full task-definition ARN",
1045 }));
1046 continue;
1047 };
1048 let Some(revisions) = state.task_definitions.get_mut(&family) else {
1049 failures.push(json!({"arn": input, "reason": "MISSING"}));
1050 continue;
1051 };
1052 let Some(td) = revisions.get_mut(&rev) else {
1053 failures.push(json!({"arn": input, "reason": "MISSING"}));
1054 continue;
1055 };
1056 if td.status == "ACTIVE" {
1057 failures.push(json!({
1058 "arn": td.task_definition_arn.clone(),
1059 "reason": "MUST_BE_INACTIVE",
1060 "detail": "Task definitions must be deregistered before they can be deleted",
1061 }));
1062 continue;
1063 }
1064 td.status = "DELETE_IN_PROGRESS".to_string();
1065 deleted.push(task_definition_to_json(td));
1066 }
1067 Ok(AwsResponse::ok_json(json!({
1068 "taskDefinitions": deleted,
1069 "failures": failures,
1070 })))
1071 }
1072
1073 fn list_task_definitions(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1074 let body = request.json_body();
1075 let family_prefix = opt_str(&body, "familyPrefix");
1076 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1077 let sort = opt_str(&body, "sort").unwrap_or("ASC");
1078 let max_results = body
1079 .get("maxResults")
1080 .and_then(|v| v.as_i64())
1081 .filter(|n| (1..=100).contains(n))
1082 .map(|n| n as usize)
1083 .unwrap_or(100);
1084 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1085
1086 let account = request.account_id.clone();
1087 let accounts = self.state.read();
1088 let mut arns: Vec<String> = Vec::new();
1089 if let Some(state) = accounts.get(&account) {
1090 for (family, revisions) in &state.task_definitions {
1091 if let Some(prefix) = family_prefix {
1092 if !family.starts_with(prefix) {
1093 continue;
1094 }
1095 }
1096 for td in revisions.values() {
1097 if td.status == status {
1098 arns.push(td.task_definition_arn.clone());
1099 }
1100 }
1101 }
1102 }
1103 if sort == "DESC" {
1104 arns.sort();
1105 arns.reverse();
1106 } else {
1107 arns.sort();
1108 }
1109 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1110 let end = (start + max_results).min(arns.len());
1111 let page = arns[start..end].to_vec();
1112 let mut out = json!({"taskDefinitionArns": page});
1113 if end < arns.len() {
1114 out.as_object_mut()
1115 .unwrap()
1116 .insert("nextToken".into(), json!(end.to_string()));
1117 }
1118 Ok(AwsResponse::ok_json(out))
1119 }
1120
1121 fn list_task_definition_families(
1122 &self,
1123 request: &AwsRequest,
1124 ) -> Result<AwsResponse, AwsServiceError> {
1125 let body = request.json_body();
1126 let family_prefix = opt_str(&body, "familyPrefix");
1127 let status = opt_str(&body, "status").unwrap_or("ACTIVE");
1128 let max_results = body
1129 .get("maxResults")
1130 .and_then(|v| v.as_i64())
1131 .filter(|n| (1..=100).contains(n))
1132 .map(|n| n as usize)
1133 .unwrap_or(100);
1134 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1135
1136 let account = request.account_id.clone();
1137 let accounts = self.state.read();
1138 let mut families: Vec<String> = Vec::new();
1139 if let Some(state) = accounts.get(&account) {
1140 for (family, revisions) in &state.task_definitions {
1141 if let Some(prefix) = family_prefix {
1142 if !family.starts_with(prefix) {
1143 continue;
1144 }
1145 }
1146 let matches_status = match status {
1147 "ACTIVE" => revisions.values().any(|td| td.status == "ACTIVE"),
1148 "INACTIVE" => revisions
1149 .values()
1150 .all(|td| td.status == "INACTIVE" || td.status == "DELETE_IN_PROGRESS"),
1151 "ALL" => true,
1152 _ => revisions.values().any(|td| td.status == status),
1153 };
1154 if matches_status {
1155 families.push(family.clone());
1156 }
1157 }
1158 }
1159 families.sort();
1160 let start = next_token.parse::<usize>().unwrap_or(0).min(families.len());
1161 let end = (start + max_results).min(families.len());
1162 let page = families[start..end].to_vec();
1163 let mut out = json!({"families": page});
1164 if end < families.len() {
1165 out.as_object_mut()
1166 .unwrap()
1167 .insert("nextToken".into(), json!(end.to_string()));
1168 }
1169 Ok(AwsResponse::ok_json(out))
1170 }
1171}
1172
1173fn validate_family_name(family: &str) -> Result<(), AwsServiceError> {
1174 if family.is_empty() || family.len() > 255 {
1175 return Err(invalid_parameter(
1176 "Task definition family must be 1-255 characters",
1177 ));
1178 }
1179 let ok = family
1180 .chars()
1181 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
1182 if !ok {
1183 return Err(invalid_parameter(
1184 "Task definition family may only contain letters, numbers, hyphens, and underscores",
1185 ));
1186 }
1187 Ok(())
1188}
1189
1190impl EcsService {
1193 fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1194 let body = request.json_body();
1195 let arn = req_str(&body, "resourceArn")?.to_string();
1196 let tags = parse_tags(&body);
1197 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1198 let mut accounts = self.state.write();
1199 let state = accounts.get_or_create(&account);
1200 match resource_type.as_str() {
1201 "cluster" => {
1202 let cluster = state
1203 .clusters
1204 .get_mut(&tail)
1205 .ok_or_else(|| resource_not_found(&arn))?;
1206 merge_tags(&mut cluster.tags, tags);
1207 }
1208 "task-definition" => {
1209 let (family, rev) = parse_family_revision(&tail);
1210 let rev = rev.ok_or_else(|| {
1211 invalid_parameter("task-definition ARN must include revision")
1212 })?;
1213 let td = state
1214 .task_definitions
1215 .get_mut(&family)
1216 .and_then(|m| m.get_mut(&rev))
1217 .ok_or_else(|| resource_not_found(&arn))?;
1218 merge_tags(&mut td.tags, tags);
1219 }
1220 other => {
1221 return Err(invalid_parameter(format!(
1222 "Tagging not yet supported for resource type: {other}"
1223 )));
1224 }
1225 }
1226 Ok(AwsResponse::ok_json(json!({})))
1227 }
1228
1229 fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230 let body = request.json_body();
1231 let arn = req_str(&body, "resourceArn")?.to_string();
1232 let keys: Vec<String> = body
1233 .get("tagKeys")
1234 .and_then(|v| v.as_array())
1235 .map(|arr| {
1236 arr.iter()
1237 .filter_map(|v| v.as_str().map(String::from))
1238 .collect()
1239 })
1240 .unwrap_or_default();
1241 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1242 let mut accounts = self.state.write();
1243 let state = accounts.get_or_create(&account);
1244 match resource_type.as_str() {
1245 "cluster" => {
1246 let cluster = state
1247 .clusters
1248 .get_mut(&tail)
1249 .ok_or_else(|| resource_not_found(&arn))?;
1250 cluster.tags.retain(|t| !keys.contains(&t.key));
1251 }
1252 "task-definition" => {
1253 let (family, rev) = parse_family_revision(&tail);
1254 let rev = rev.ok_or_else(|| {
1255 invalid_parameter("task-definition ARN must include revision")
1256 })?;
1257 let td = state
1258 .task_definitions
1259 .get_mut(&family)
1260 .and_then(|m| m.get_mut(&rev))
1261 .ok_or_else(|| resource_not_found(&arn))?;
1262 td.tags.retain(|t| !keys.contains(&t.key));
1263 }
1264 other => {
1265 return Err(invalid_parameter(format!(
1266 "Tagging not yet supported for resource type: {other}"
1267 )));
1268 }
1269 }
1270 Ok(AwsResponse::ok_json(json!({})))
1271 }
1272
1273 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274 let body = request.json_body();
1275 let arn = req_str(&body, "resourceArn")?.to_string();
1276 let (account, resource_type, tail) = decode_ecs_arn(&arn)?;
1277 let accounts = self.state.read();
1278 let state = accounts
1279 .get(&account)
1280 .ok_or_else(|| resource_not_found(&arn))?;
1281 let tags = match resource_type.as_str() {
1282 "cluster" => state
1283 .clusters
1284 .get(&tail)
1285 .map(|c| c.tags.clone())
1286 .ok_or_else(|| resource_not_found(&arn))?,
1287 "task-definition" => {
1288 let (family, rev) = parse_family_revision(&tail);
1289 let rev = rev.ok_or_else(|| {
1290 invalid_parameter("task-definition ARN must include revision")
1291 })?;
1292 state
1293 .task_definitions
1294 .get(&family)
1295 .and_then(|m| m.get(&rev))
1296 .map(|td| td.tags.clone())
1297 .ok_or_else(|| resource_not_found(&arn))?
1298 }
1299 other => {
1300 return Err(invalid_parameter(format!(
1301 "ListTagsForResource not yet supported for resource type: {other}"
1302 )));
1303 }
1304 };
1305 Ok(AwsResponse::ok_json(json!({"tags": tags_json(&tags)})))
1306 }
1307}
1308
1309fn resource_not_found(arn: &str) -> AwsServiceError {
1310 AwsServiceError::aws_error(
1311 StatusCode::BAD_REQUEST,
1312 "ClientException",
1313 format!("The referenced resource was not found: {arn}"),
1314 )
1315}
1316
1317impl EcsService {
1320 fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1321 let body = request.json_body();
1322 let name = req_str(&body, "name")?.to_string();
1323 let value = req_str(&body, "value")?.to_string();
1324 let principal_arn = opt_str(&body, "principalArn")
1325 .map(String::from)
1326 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1327 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1328 let account = request.account_id.clone();
1329 let mut accounts = self.state.write();
1330 let state = accounts.get_or_create(&account);
1331 state
1332 .principal_account_settings
1333 .entry(principal_arn.clone())
1334 .or_default()
1335 .insert(name.clone(), value.clone());
1336 Ok(AwsResponse::ok_json(json!({
1337 "setting": {
1338 "name": name,
1339 "value": value,
1340 "principalArn": principal_arn,
1341 }
1342 })))
1343 }
1344
1345 fn put_account_setting_default(
1346 &self,
1347 request: &AwsRequest,
1348 ) -> Result<AwsResponse, AwsServiceError> {
1349 let body = request.json_body();
1350 let name = req_str(&body, "name")?.to_string();
1351 let value = req_str(&body, "value")?.to_string();
1352 let account = request.account_id.clone();
1353 let mut accounts = self.state.write();
1354 let state = accounts.get_or_create(&account);
1355 state
1356 .account_setting_defaults
1357 .insert(name.clone(), value.clone());
1358 Ok(AwsResponse::ok_json(json!({
1359 "setting": {
1360 "name": name,
1361 "value": value,
1362 "principalArn": format!("arn:aws:iam::{}:root", state.account_id),
1363 }
1364 })))
1365 }
1366
1367 fn delete_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1368 let body = request.json_body();
1369 let name = req_str(&body, "name")?.to_string();
1370 let principal_arn = opt_str(&body, "principalArn")
1371 .map(String::from)
1372 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1373 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
1374 let account = request.account_id.clone();
1375 let mut accounts = self.state.write();
1376 let state = accounts.get_or_create(&account);
1377 let removed_value = state
1378 .principal_account_settings
1379 .get_mut(&principal_arn)
1380 .and_then(|m| m.remove(&name));
1381 Ok(AwsResponse::ok_json(json!({
1382 "setting": {
1383 "name": name,
1384 "value": removed_value.unwrap_or_default(),
1385 "principalArn": principal_arn,
1386 }
1387 })))
1388 }
1389
1390 fn list_account_settings(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1391 let body = request.json_body();
1392 let name_filter = opt_str(&body, "name");
1393 let value_filter = opt_str(&body, "value");
1394 let principal_filter = opt_str(&body, "principalArn");
1395 let effective_only = body
1396 .get("effectiveSettings")
1397 .and_then(|v| v.as_bool())
1398 .unwrap_or(false);
1399
1400 let account = request.account_id.clone();
1401 let accounts = self.state.read();
1402 let Some(state) = accounts.get(&account) else {
1403 return Ok(AwsResponse::ok_json(json!({"settings": []})));
1404 };
1405 let root_arn = format!("arn:aws:iam::{}:root", state.account_id);
1406 let mut settings: Vec<Value> = Vec::new();
1407
1408 if effective_only {
1409 let principal = principal_filter
1412 .map(String::from)
1413 .or_else(|| request.principal.as_ref().map(|p| p.arn.clone()))
1414 .unwrap_or_else(|| root_arn.clone());
1415 let mut merged = state.account_setting_defaults.clone();
1416 if let Some(overrides) = state.principal_account_settings.get(&principal) {
1417 for (k, v) in overrides {
1418 merged.insert(k.clone(), v.clone());
1419 }
1420 }
1421 for (k, v) in merged {
1422 if matches_filter(name_filter, &k) && matches_filter(value_filter, &v) {
1423 settings.push(json!({
1424 "name": k,
1425 "value": v,
1426 "principalArn": principal,
1427 }));
1428 }
1429 }
1430 } else {
1431 for (k, v) in &state.account_setting_defaults {
1434 if matches_filter(name_filter, k)
1435 && matches_filter(value_filter, v)
1436 && (principal_filter.is_none() || principal_filter == Some(root_arn.as_str()))
1437 {
1438 settings.push(json!({
1439 "name": k,
1440 "value": v,
1441 "principalArn": root_arn,
1442 }));
1443 }
1444 }
1445 for (principal, entries) in &state.principal_account_settings {
1446 if principal_filter.is_some_and(|pf| pf != principal) {
1447 continue;
1448 }
1449 for (k, v) in entries {
1450 if matches_filter(name_filter, k) && matches_filter(value_filter, v) {
1451 settings.push(json!({
1452 "name": k,
1453 "value": v,
1454 "principalArn": principal,
1455 }));
1456 }
1457 }
1458 }
1459 }
1460
1461 Ok(AwsResponse::ok_json(json!({"settings": settings})))
1462 }
1463}
1464
1465fn matches_filter(filter: Option<&str>, value: &str) -> bool {
1466 filter.is_none_or(|f| f == value)
1467}
1468
1469impl EcsService {
1472 fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1473 let body = request.json_body();
1474 let td_ref = req_str(&body, "taskDefinition")?;
1475 let cluster_ref = opt_str(&body, "cluster");
1476 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1477 let launch_type = opt_str(&body, "launchType")
1478 .unwrap_or("FARGATE")
1479 .to_string();
1480 let count = body
1481 .get("count")
1482 .and_then(|v| v.as_i64())
1483 .filter(|n| (1..=10).contains(n))
1484 .unwrap_or(1) as usize;
1485 let group = opt_str(&body, "group").map(String::from);
1486 let started_by = opt_str(&body, "startedBy").map(String::from);
1487 let tags = parse_tags(&body);
1488
1489 if let Some(overrides) = body.get("overrides") {
1495 if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
1496 self.check_pass_role(&request.account_id, role_arn)?;
1497 }
1498 if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
1499 self.check_pass_role(&request.account_id, role_arn)?;
1500 }
1501 }
1502
1503 let account = request.account_id.clone();
1504 let runtime = self.runtime.clone();
1505 let mut accounts = self.state.write();
1506 let state = accounts.get_or_create(&account);
1507 let cluster_arn = state
1508 .clusters
1509 .get(&cluster_name)
1510 .map(|c| c.cluster_arn.clone())
1511 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
1512 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
1513 let revisions = state
1514 .task_definitions
1515 .get(&family)
1516 .ok_or_else(|| task_definition_not_found(td_ref))?;
1517 let td = match rev {
1518 Some(n) => revisions
1519 .get(&n)
1520 .ok_or_else(|| task_definition_not_found(td_ref))?,
1521 None => latest_active_revision(revisions)
1522 .ok_or_else(|| task_definition_not_found(td_ref))?,
1523 };
1524 if td.status != "ACTIVE" {
1525 return Err(client_exception(format!(
1526 "Task definition {} is not ACTIVE",
1527 td.task_definition_arn
1528 )));
1529 }
1530 let td_arn = td.task_definition_arn.clone();
1531 let td_family = td.family.clone();
1532 let td_revision = td.revision;
1533 let td_cpu = td.cpu.clone();
1534 let td_memory = td.memory.clone();
1535 let td_task_role = td.task_role_arn.clone();
1536 let td_exec_role = td.execution_role_arn.clone();
1537 let td_containers = td.container_definitions.clone();
1538
1539 let mut spawned_tasks: Vec<String> = Vec::new();
1540 let mut task_jsons: Vec<Value> = Vec::new();
1541 for _ in 0..count {
1542 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
1543 let task_arn = state.task_arn(&cluster_name, &task_id);
1544 let containers: Vec<Container> = td_containers
1545 .iter()
1546 .map(|def| Container {
1547 container_arn: format!(
1548 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
1549 state.region,
1550 state.account_id,
1551 cluster_name,
1552 task_id,
1553 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
1554 ),
1555 name: def
1556 .get("name")
1557 .and_then(|v| v.as_str())
1558 .unwrap_or("app")
1559 .to_string(),
1560 image: def
1561 .get("image")
1562 .and_then(|v| v.as_str())
1563 .unwrap_or("")
1564 .to_string(),
1565 task_arn: task_arn.clone(),
1566 last_status: "PENDING".into(),
1567 exit_code: None,
1568 reason: None,
1569 runtime_id: None,
1570 essential: def
1571 .get("essential")
1572 .and_then(|v| v.as_bool())
1573 .unwrap_or(true),
1574 cpu: def
1575 .get("cpu")
1576 .and_then(|v| v.as_i64())
1577 .map(|n| n.to_string()),
1578 memory: def
1579 .get("memory")
1580 .and_then(|v| v.as_i64())
1581 .map(|n| n.to_string()),
1582 memory_reservation: def
1583 .get("memoryReservation")
1584 .and_then(|v| v.as_i64())
1585 .map(|n| n.to_string()),
1586 network_bindings: Vec::new(),
1587 network_interfaces: Vec::new(),
1588 health_status: Some("UNKNOWN".to_string()),
1589 managed_agents: None,
1590 })
1591 .collect();
1592 let awslogs = td_containers.iter().find_map(|def| {
1593 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
1594 let log_cfg = def.get("logConfiguration")?;
1595 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
1596 return None;
1597 }
1598 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
1599 Some(AwsLogsConfig {
1600 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
1601 stream_prefix: opts
1602 .get("awslogs-stream-prefix")
1603 .and_then(|v| v.as_str())
1604 .map(String::from),
1605 region: opts
1606 .get("awslogs-region")
1607 .and_then(|v| v.as_str())
1608 .unwrap_or(&state.region)
1609 .to_string(),
1610 container_name: name,
1611 })
1612 });
1613 let task = Task {
1614 task_arn: task_arn.clone(),
1615 task_id: task_id.clone(),
1616 cluster_arn: cluster_arn.clone(),
1617 cluster_name: cluster_name.clone(),
1618 task_definition_arn: td_arn.clone(),
1619 family: td_family.clone(),
1620 revision: td_revision,
1621 last_status: "PROVISIONING".into(),
1622 desired_status: "RUNNING".into(),
1623 launch_type: launch_type.clone(),
1624 platform_version: Some("1.4.0".into()),
1625 cpu: body
1626 .get("overrides")
1627 .and_then(|v| v.get("cpu"))
1628 .and_then(|v| v.as_str())
1629 .map(String::from)
1630 .or_else(|| td_cpu.clone()),
1631 memory: body
1632 .get("overrides")
1633 .and_then(|v| v.get("memory"))
1634 .and_then(|v| v.as_str())
1635 .map(String::from)
1636 .or_else(|| td_memory.clone()),
1637 containers,
1638 overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
1639 started_by: started_by.clone(),
1640 group: group.clone(),
1641 connectivity: "CONNECTING".into(),
1642 stop_code: None,
1643 stopped_reason: None,
1644 created_at: Utc::now(),
1645 started_at: None,
1646 stopping_at: None,
1647 stopped_at: None,
1648 pull_started_at: None,
1649 pull_stopped_at: None,
1650 connectivity_at: None,
1651 started_by_ref_id: None,
1652 execution_role_arn: td_exec_role.clone(),
1653 task_role_arn: td_task_role.clone(),
1654 tags: tags.clone(),
1655 awslogs,
1656 captured_logs: String::new(),
1657 protection: None,
1658 };
1659 state.tasks.insert(task_id.clone(), task.clone());
1660 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
1661 cluster.pending_tasks_count += 1;
1662 }
1663 if let Some(t) = state.tasks.get_mut(&task_id) {
1667 t.last_status = "PENDING".into();
1668 }
1669 task_jsons.push(task_to_json(&task));
1670 spawned_tasks.push(task_id.clone());
1671 }
1672 drop(accounts);
1673
1674 if let Some(rt) = runtime {
1676 for id in &spawned_tasks {
1677 rt.clone()
1678 .run_task(self.state.clone(), id.clone(), account.clone());
1679 }
1680 } else {
1681 let mut accounts = self.state.write();
1686 if let Some(state) = accounts.get_mut(&account) {
1687 let mut cluster_drains: Vec<String> = Vec::new();
1688 for id in &spawned_tasks {
1689 if let Some(t) = state.tasks.get_mut(id) {
1690 t.last_status = "STOPPED".into();
1691 t.desired_status = "STOPPED".into();
1692 t.stop_code = Some("TaskFailedToStart".into());
1693 t.stopped_reason = Some(
1694 "No container runtime available (docker/podman not installed)".into(),
1695 );
1696 t.stopped_at = Some(Utc::now());
1697 for c in t.containers.iter_mut() {
1698 c.last_status = "STOPPED".into();
1699 }
1700 cluster_drains.push(t.cluster_name.clone());
1701 }
1702 }
1703 for name in cluster_drains {
1704 if let Some(cluster) = state.clusters.get_mut(&name) {
1705 if cluster.pending_tasks_count > 0 {
1706 cluster.pending_tasks_count -= 1;
1707 }
1708 }
1709 }
1710 }
1711 }
1712
1713 Ok(AwsResponse::ok_json(json!({
1714 "tasks": task_jsons,
1715 "failures": [],
1716 })))
1717 }
1718
1719 fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1720 self.run_task(request)
1725 }
1726
1727 async fn stop_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1728 let body = request.json_body();
1729 let task_ref = req_str(&body, "task")?;
1730 let reason = opt_str(&body, "reason")
1731 .unwrap_or("UserInitiated")
1732 .to_string();
1733 let cluster_ref = opt_str(&body, "cluster");
1734 let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1735
1736 let (task_id, account, task_snapshot) = {
1737 let account = request.account_id.clone();
1738 let mut accounts = self.state.write();
1739 let state = accounts
1740 .get_mut(&account)
1741 .ok_or_else(|| task_not_found(task_ref))?;
1742 let task_id = resolve_task_id(state, task_ref)?;
1743 let task = state
1744 .tasks
1745 .get_mut(&task_id)
1746 .ok_or_else(|| task_not_found(task_ref))?;
1747 task.desired_status = "STOPPED".into();
1748 task.stopping_at = Some(Utc::now());
1749 task.stopped_reason = Some(reason.clone());
1750 task.stop_code = Some("UserInitiated".into());
1751 (task_id, account, task.clone())
1752 };
1753 if let Some(rt) = &self.runtime {
1754 rt.stop_task(&task_id, &reason).await;
1755 }
1756 let _ = account;
1757 Ok(AwsResponse::ok_json(json!({
1758 "task": task_to_json(&task_snapshot),
1759 })))
1760 }
1761
1762 fn describe_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1763 let body = request.json_body();
1764 let refs: Vec<String> = body
1765 .get("tasks")
1766 .and_then(|v| v.as_array())
1767 .map(|arr| {
1768 arr.iter()
1769 .filter_map(|v| v.as_str().map(String::from))
1770 .collect()
1771 })
1772 .unwrap_or_default();
1773 let include_tags = body
1774 .get("include")
1775 .and_then(|v| v.as_array())
1776 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
1777 .unwrap_or(false);
1778
1779 let account = request.account_id.clone();
1780 let accounts = self.state.read();
1781 let Some(state) = accounts.get(&account) else {
1782 return Ok(AwsResponse::ok_json(json!({
1783 "tasks": [],
1784 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
1785 })));
1786 };
1787 let mut found = Vec::new();
1788 let mut failures = Vec::new();
1789 for input in &refs {
1790 let task_id = task_id_from_ref(input);
1791 match state.tasks.get(&task_id) {
1792 Some(t) => {
1793 let mut v = task_to_json(t);
1794 if include_tags {
1795 v.as_object_mut()
1796 .unwrap()
1797 .insert("tags".into(), tags_json(&t.tags));
1798 }
1799 found.push(v);
1800 }
1801 None => {
1802 failures.push(json!({
1803 "arn": input,
1804 "reason": "MISSING",
1805 }));
1806 }
1807 }
1808 }
1809 Ok(AwsResponse::ok_json(json!({
1810 "tasks": found,
1811 "failures": failures,
1812 })))
1813 }
1814
1815 fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1816 let body = request.json_body();
1817 let cluster_ref = opt_str(&body, "cluster");
1818 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1819 let family = opt_str(&body, "family");
1820 let status_filter = opt_str(&body, "desiredStatus");
1821 let started_by = opt_str(&body, "startedBy");
1822 let max_results = body
1823 .get("maxResults")
1824 .and_then(|v| v.as_i64())
1825 .filter(|n| (1..=100).contains(n))
1826 .map(|n| n as usize)
1827 .unwrap_or(100);
1828 let next_token = opt_str(&body, "nextToken").unwrap_or("");
1829
1830 let account = request.account_id.clone();
1831 let accounts = self.state.read();
1832 let mut arns: Vec<String> = match accounts.get(&account) {
1833 Some(state) => state
1834 .tasks
1835 .values()
1836 .filter(|t| t.cluster_name == cluster_name)
1837 .filter(|t| family.is_none_or(|f| t.family == f))
1838 .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
1839 .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
1840 .map(|t| t.task_arn.clone())
1841 .collect(),
1842 None => Vec::new(),
1843 };
1844 arns.sort();
1845 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
1846 let end = (start + max_results).min(arns.len());
1847 let page = arns[start..end].to_vec();
1848 let mut out = json!({"taskArns": page});
1849 if end < arns.len() {
1850 out.as_object_mut()
1851 .unwrap()
1852 .insert("nextToken".into(), json!(end.to_string()));
1853 }
1854 Ok(AwsResponse::ok_json(out))
1855 }
1856}
1857
1858fn task_not_found(task_ref: &str) -> AwsServiceError {
1859 AwsServiceError::aws_error(
1860 StatusCode::BAD_REQUEST,
1861 "ClientException",
1862 format!("Task not found: {task_ref}"),
1863 )
1864}
1865
1866fn task_id_from_ref(input: &str) -> String {
1868 if let Some(rest) = input.rsplit('/').next() {
1869 return rest.to_string();
1870 }
1871 input.to_string()
1872}
1873
1874fn resolve_task_id(state: &EcsState, task_ref: &str) -> Result<String, AwsServiceError> {
1875 let id = task_id_from_ref(task_ref);
1876 if state.tasks.contains_key(&id) {
1877 Ok(id)
1878 } else {
1879 Err(task_not_found(task_ref))
1880 }
1881}
1882
1883fn task_to_json(task: &Task) -> Value {
1884 let mut map = serde_json::Map::new();
1885 map.insert("taskArn".into(), json!(task.task_arn));
1886 map.insert("clusterArn".into(), json!(task.cluster_arn));
1887 map.insert("taskDefinitionArn".into(), json!(task.task_definition_arn));
1888 map.insert("lastStatus".into(), json!(task.last_status));
1889 map.insert("desiredStatus".into(), json!(task.desired_status));
1890 map.insert("launchType".into(), json!(task.launch_type));
1891 if let Some(ref v) = task.platform_version {
1892 map.insert("platformVersion".into(), json!(v));
1893 }
1894 if let Some(ref v) = task.cpu {
1895 map.insert("cpu".into(), json!(v));
1896 }
1897 if let Some(ref v) = task.memory {
1898 map.insert("memory".into(), json!(v));
1899 }
1900 map.insert(
1901 "containers".into(),
1902 Value::Array(task.containers.iter().map(container_to_json).collect()),
1903 );
1904 map.insert("overrides".into(), task.overrides.clone());
1905 if let Some(ref v) = task.started_by {
1906 map.insert("startedBy".into(), json!(v));
1907 }
1908 if let Some(ref v) = task.group {
1909 map.insert("group".into(), json!(v));
1910 }
1911 map.insert("connectivity".into(), json!(task.connectivity));
1912 if let Some(ref v) = task.stop_code {
1913 map.insert("stopCode".into(), json!(v));
1914 }
1915 if let Some(ref v) = task.stopped_reason {
1916 map.insert("stoppedReason".into(), json!(v));
1917 }
1918 if let Some(ref v) = task.task_role_arn {
1919 map.insert("taskRoleArn".into(), json!(v));
1920 }
1921 if let Some(ref v) = task.execution_role_arn {
1922 map.insert("executionRoleArn".into(), json!(v));
1923 }
1924 map.insert("createdAt".into(), json!(task.created_at.timestamp()));
1925 if let Some(ts) = task.started_at {
1926 map.insert("startedAt".into(), json!(ts.timestamp()));
1927 }
1928 if let Some(ts) = task.stopping_at {
1929 map.insert("stoppingAt".into(), json!(ts.timestamp()));
1930 }
1931 if let Some(ts) = task.stopped_at {
1932 map.insert("stoppedAt".into(), json!(ts.timestamp()));
1933 }
1934 if let Some(ts) = task.pull_started_at {
1935 map.insert("pullStartedAt".into(), json!(ts.timestamp()));
1936 }
1937 if let Some(ts) = task.pull_stopped_at {
1938 map.insert("pullStoppedAt".into(), json!(ts.timestamp()));
1939 }
1940 if let Some(ts) = task.connectivity_at {
1941 map.insert("connectivityAt".into(), json!(ts.timestamp()));
1942 }
1943 Value::Object(map)
1944}
1945
1946fn container_to_json(container: &Container) -> Value {
1947 let mut map = serde_json::Map::new();
1948 map.insert("containerArn".into(), json!(container.container_arn));
1949 map.insert("taskArn".into(), json!(container.task_arn));
1950 map.insert("name".into(), json!(container.name));
1951 map.insert("image".into(), json!(container.image));
1952 map.insert("lastStatus".into(), json!(container.last_status));
1953 map.insert("essential".into(), json!(container.essential));
1954 if let Some(code) = container.exit_code {
1955 map.insert("exitCode".into(), json!(code));
1956 }
1957 if let Some(ref r) = container.reason {
1958 map.insert("reason".into(), json!(r));
1959 }
1960 if let Some(ref id) = container.runtime_id {
1961 map.insert("runtimeId".into(), json!(id));
1962 }
1963 if let Some(ref v) = container.cpu {
1964 map.insert("cpu".into(), json!(v));
1965 }
1966 if let Some(ref v) = container.memory {
1967 map.insert("memory".into(), json!(v));
1968 }
1969 if let Some(ref v) = container.memory_reservation {
1970 map.insert("memoryReservation".into(), json!(v));
1971 }
1972 map.insert("networkBindings".into(), json!(container.network_bindings));
1973 map.insert(
1974 "networkInterfaces".into(),
1975 json!(container.network_interfaces),
1976 );
1977 if let Some(ref v) = container.health_status {
1978 map.insert("healthStatus".into(), json!(v));
1979 }
1980 Value::Object(map)
1981}
1982
1983impl EcsService {
1986 fn create_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1987 let body = request.json_body();
1988 let service_name = req_str(&body, "serviceName")?.to_string();
1989 validate_service_name(&service_name)?;
1990 let td_ref = req_str(&body, "taskDefinition")?;
1991 let cluster_ref = opt_str(&body, "cluster");
1992 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
1993 let desired_count = body
1994 .get("desiredCount")
1995 .and_then(|v| v.as_i64())
1996 .filter(|n| *n >= 0)
1997 .unwrap_or(1) as i32;
1998 let launch_type = opt_str(&body, "launchType")
1999 .unwrap_or("FARGATE")
2000 .to_string();
2001 let scheduling = opt_str(&body, "schedulingStrategy")
2002 .unwrap_or("REPLICA")
2003 .to_string();
2004 let deployment_controller = body
2005 .get("deploymentController")
2006 .and_then(|v| v.get("type"))
2007 .and_then(|v| v.as_str())
2008 .unwrap_or("ECS")
2009 .to_string();
2010 let deployment_config = body.get("deploymentConfiguration");
2011 let min_healthy = deployment_config
2012 .and_then(|d| d.get("minimumHealthyPercent"))
2013 .and_then(|v| v.as_i64())
2014 .map(|n| n as i32);
2015 let max_percent = deployment_config
2016 .and_then(|d| d.get("maximumPercent"))
2017 .and_then(|v| v.as_i64())
2018 .map(|n| n as i32);
2019 let circuit = deployment_config.and_then(|d| d.get("deploymentCircuitBreaker"));
2020 let circuit_breaker = circuit.map(|c| CircuitBreakerConfig {
2021 enable: c.get("enable").and_then(|v| v.as_bool()).unwrap_or(false),
2022 rollback: c.get("rollback").and_then(|v| v.as_bool()).unwrap_or(false),
2023 });
2024 let tags = parse_tags(&body);
2025 let role_arn = opt_str(&body, "role").map(String::from);
2026 let load_balancers: Vec<Value> = body
2027 .get("loadBalancers")
2028 .and_then(|v| v.as_array())
2029 .cloned()
2030 .unwrap_or_default();
2031 let service_registries: Vec<Value> = body
2032 .get("serviceRegistries")
2033 .and_then(|v| v.as_array())
2034 .cloned()
2035 .unwrap_or_default();
2036 let placement_constraints: Vec<Value> = body
2037 .get("placementConstraints")
2038 .and_then(|v| v.as_array())
2039 .cloned()
2040 .unwrap_or_default();
2041 let placement_strategy: Vec<Value> = body
2042 .get("placementStrategy")
2043 .and_then(|v| v.as_array())
2044 .cloned()
2045 .unwrap_or_default();
2046 let network_configuration = body.get("networkConfiguration").cloned();
2047
2048 let runtime = self.runtime.clone();
2049 let account = request.account_id.clone();
2050 let principal_arn = request
2051 .principal
2052 .as_ref()
2053 .map(|p| p.arn.clone())
2054 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2055
2056 let (service_json, spawn_task_ids) = {
2057 let mut accounts = self.state.write();
2058 let state = accounts.get_or_create(&account);
2059 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2061 let revisions = state
2062 .task_definitions
2063 .get(&family)
2064 .ok_or_else(|| task_definition_not_found(td_ref))?;
2065 let td = match rev {
2066 Some(n) => revisions
2067 .get(&n)
2068 .ok_or_else(|| task_definition_not_found(td_ref))?,
2069 None => latest_active_revision(revisions)
2070 .ok_or_else(|| task_definition_not_found(td_ref))?,
2071 };
2072 let td_arn = td.task_definition_arn.clone();
2073 let td_family = td.family.clone();
2074 let td_revision = td.revision;
2075 let cluster_arn = state
2076 .clusters
2077 .get(&cluster_name)
2078 .map(|c| c.cluster_arn.clone())
2079 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2080 let service_arn = state.service_arn(&cluster_name, &service_name);
2081 let key = EcsState::service_key(&cluster_name, &service_name);
2082 if let Some(existing) = state.services.get(&key) {
2083 if existing.status != "INACTIVE" {
2084 return Err(service_already_exists(&service_name));
2085 }
2086 }
2087 let deployment = Deployment {
2088 deployment_id: format!(
2089 "ecs-svc/{}",
2090 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2091 ),
2092 status: "PRIMARY".into(),
2093 task_definition_arn: td_arn.clone(),
2094 desired_count,
2095 pending_count: 0,
2096 running_count: 0,
2097 failed_tasks: 0,
2098 created_at: Utc::now(),
2099 updated_at: Utc::now(),
2100 launch_type: launch_type.clone(),
2101 rollout_state: "IN_PROGRESS".into(),
2102 rollout_state_reason: Some("ECS deployment in progress.".into()),
2103 };
2104 let service = Service {
2105 service_name: service_name.clone(),
2106 service_arn: service_arn.clone(),
2107 cluster_name: cluster_name.clone(),
2108 cluster_arn: cluster_arn.clone(),
2109 task_definition_arn: td_arn,
2110 family: td_family,
2111 revision: td_revision,
2112 desired_count,
2113 running_count: 0,
2114 pending_count: 0,
2115 launch_type: launch_type.clone(),
2116 status: "ACTIVE".into(),
2117 scheduling_strategy: scheduling,
2118 deployment_controller,
2119 minimum_healthy_percent: min_healthy,
2120 maximum_percent: max_percent,
2121 circuit_breaker,
2122 deployments: vec![deployment],
2123 load_balancers,
2124 service_registries,
2125 placement_constraints,
2126 placement_strategy,
2127 network_configuration,
2128 tags: tags.clone(),
2129 created_at: Utc::now(),
2130 created_by: Some(principal_arn.clone()),
2131 role_arn,
2132 };
2133 state.services.insert(key.clone(), service.clone());
2134 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2135 cluster.active_services_count += 1;
2136 }
2137 state.push_event(crate::state::LifecycleEvent {
2138 at: Utc::now(),
2139 event_type: "ServiceCreated".into(),
2140 task_arn: None,
2141 cluster_arn: Some(cluster_arn),
2142 last_status: Some("ACTIVE".into()),
2143 detail: json!({"serviceArn": service_arn, "desiredCount": desired_count}),
2144 });
2145 let ids =
2146 spawn_service_tasks(state, &service, desired_count, &principal_arn, &launch_type);
2147 (service_to_json(state.services.get(&key).unwrap()), ids)
2148 };
2149
2150 if let Some(rt) = runtime {
2151 for id in spawn_task_ids {
2152 rt.clone().run_task(self.state.clone(), id, account.clone());
2153 }
2154 }
2155
2156 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2157 }
2158
2159 fn update_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2160 let body = request.json_body();
2161 let service_ref = req_str(&body, "service")?;
2162 let service_name = service_name_from_ref(service_ref);
2163 let cluster_ref = opt_str(&body, "cluster");
2164 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2165 let new_desired = body.get("desiredCount").and_then(|v| v.as_i64());
2166 let new_td_ref = opt_str(&body, "taskDefinition");
2167 let account = request.account_id.clone();
2168 let principal_arn = request
2169 .principal
2170 .as_ref()
2171 .map(|p| p.arn.clone())
2172 .unwrap_or_else(|| format!("arn:aws:iam::{}:root", request.account_id));
2173 let runtime = self.runtime.clone();
2174
2175 let (service_json, spawn_ids, stop_ids) = {
2176 let mut accounts = self.state.write();
2177 let state = accounts
2178 .get_mut(&account)
2179 .ok_or_else(|| service_not_found(&service_name))?;
2180 let key = EcsState::service_key(&cluster_name, &service_name);
2181 if !state.services.contains_key(&key) {
2182 return Err(service_not_found(&service_name));
2183 }
2184
2185 let (new_td_arn, new_family, new_revision) = if let Some(td_ref) = new_td_ref {
2187 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
2188 let revisions = state
2189 .task_definitions
2190 .get(&family)
2191 .ok_or_else(|| task_definition_not_found(td_ref))?;
2192 let td = match rev {
2193 Some(n) => revisions
2194 .get(&n)
2195 .ok_or_else(|| task_definition_not_found(td_ref))?,
2196 None => latest_active_revision(revisions)
2197 .ok_or_else(|| task_definition_not_found(td_ref))?,
2198 };
2199 (
2200 Some(td.task_definition_arn.clone()),
2201 td.family.clone(),
2202 td.revision,
2203 )
2204 } else {
2205 let svc = state.services.get(&key).unwrap();
2206 (None, svc.family.clone(), svc.revision)
2207 };
2208
2209 let service_cluster_arn;
2210 let launch_type_clone;
2211 let effective_desired;
2212 let old_desired;
2213 let mut old_deployments_drained: Vec<String> = Vec::new();
2214 let mut new_deployment_triggered = false;
2215
2216 {
2217 let svc = state.services.get_mut(&key).unwrap();
2218 old_desired = svc.desired_count;
2219 service_cluster_arn = svc.cluster_arn.clone();
2220 launch_type_clone = svc.launch_type.clone();
2221
2222 if let Some(n) = new_desired {
2223 let n = n.max(0) as i32;
2224 svc.desired_count = n;
2225 if let Some(d) = svc.deployments.iter_mut().find(|d| d.status == "PRIMARY") {
2226 d.desired_count = n;
2227 d.updated_at = Utc::now();
2228 }
2229 }
2230
2231 if let Some(arn) = new_td_arn.clone() {
2232 for d in svc.deployments.iter_mut() {
2235 if d.status == "PRIMARY" {
2236 d.status = "ACTIVE".into();
2237 old_deployments_drained.push(d.deployment_id.clone());
2238 }
2239 }
2240 svc.deployments.insert(
2241 0,
2242 Deployment {
2243 deployment_id: format!(
2244 "ecs-svc/{}",
2245 uuid::Uuid::new_v4().as_u128() & 0xffff_ffff_ffff_ffff
2246 ),
2247 status: "PRIMARY".into(),
2248 task_definition_arn: arn.clone(),
2249 desired_count: svc.desired_count,
2250 pending_count: 0,
2251 running_count: 0,
2252 failed_tasks: 0,
2253 created_at: Utc::now(),
2254 updated_at: Utc::now(),
2255 launch_type: svc.launch_type.clone(),
2256 rollout_state: "IN_PROGRESS".into(),
2257 rollout_state_reason: Some("ECS deployment in progress.".into()),
2258 },
2259 );
2260 svc.task_definition_arn = arn;
2261 svc.family = new_family;
2262 svc.revision = new_revision;
2263 new_deployment_triggered = true;
2264 }
2265
2266 effective_desired = svc.desired_count;
2267 }
2268
2269 let mut spawn: Vec<String> = Vec::new();
2271 let mut stop: Vec<String> = Vec::new();
2272
2273 let service_tag = format!("ecs-svc/{}", service_name);
2275 let current_tasks: Vec<(String, String)> = state
2276 .tasks
2277 .iter()
2278 .filter(|(_, t)| {
2279 t.started_by.as_deref() == Some(service_tag.as_str())
2280 && t.cluster_name == cluster_name
2281 && t.last_status != "STOPPED"
2282 })
2283 .map(|(id, t)| (id.clone(), t.task_definition_arn.clone()))
2284 .collect();
2285
2286 let current_count = current_tasks.len() as i32;
2287 if effective_desired > current_count {
2288 let add = (effective_desired - current_count) as usize;
2289 let svc_snapshot = state.services.get(&key).unwrap().clone();
2290 let mut new_ids = spawn_service_tasks(
2291 state,
2292 &svc_snapshot,
2293 add as i32,
2294 &principal_arn,
2295 &launch_type_clone,
2296 );
2297 spawn.append(&mut new_ids);
2298 } else if effective_desired < current_count {
2299 let remove = (current_count - effective_desired) as usize;
2300 for (id, _) in current_tasks.iter().take(remove) {
2301 stop.push(id.clone());
2302 }
2303 }
2304
2305 if new_deployment_triggered {
2311 let new_td_arn_match = state
2312 .services
2313 .get(&key)
2314 .unwrap()
2315 .task_definition_arn
2316 .clone();
2317 let kept_on_new_td: i32 = current_tasks
2321 .iter()
2322 .filter(|(id, t_arn)| *t_arn == new_td_arn_match && !stop.contains(id))
2323 .count() as i32;
2324 for (id, t_arn) in ¤t_tasks {
2325 if *t_arn != new_td_arn_match && !stop.contains(id) {
2326 stop.push(id.clone());
2327 }
2328 }
2329 let already_spawned = spawn.len() as i32;
2330 let need = (effective_desired - kept_on_new_td - already_spawned).max(0);
2331 if need > 0 {
2332 let svc_snapshot = state.services.get(&key).unwrap().clone();
2333 let mut more = spawn_service_tasks(
2334 state,
2335 &svc_snapshot,
2336 need,
2337 &principal_arn,
2338 &launch_type_clone,
2339 );
2340 spawn.append(&mut more);
2341 }
2342 }
2343
2344 state.push_event(crate::state::LifecycleEvent {
2345 at: Utc::now(),
2346 event_type: "ServiceUpdated".into(),
2347 task_arn: None,
2348 cluster_arn: Some(service_cluster_arn),
2349 last_status: Some("ACTIVE".into()),
2350 detail: json!({
2351 "serviceArn": state.services.get(&key).unwrap().service_arn,
2352 "desiredCount": effective_desired,
2353 "previousDesiredCount": old_desired,
2354 "newDeployment": new_deployment_triggered,
2355 "drainedDeployments": old_deployments_drained,
2356 }),
2357 });
2358
2359 let svc = state.services.get(&key).unwrap();
2360 (service_to_json(svc), spawn, stop)
2361 };
2362
2363 if let Some(rt) = runtime {
2364 for id in spawn_ids {
2365 rt.clone().run_task(self.state.clone(), id, account.clone());
2366 }
2367 for id in stop_ids {
2368 let rt2 = rt.clone();
2369 let id_clone = id.clone();
2370 tokio::spawn(async move {
2371 rt2.stop_task(&id_clone, "ECS service scale-down").await;
2372 });
2373 let mut accounts = self.state.write();
2375 if let Some(state) = accounts.get_mut(&account) {
2376 if let Some(task) = state.tasks.get_mut(&id) {
2377 task.desired_status = "STOPPED".into();
2378 task.stopping_at = Some(Utc::now());
2379 }
2380 }
2381 }
2382 }
2383
2384 Ok(AwsResponse::ok_json(json!({ "service": service_json })))
2385 }
2386
2387 async fn delete_service(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2388 let body = request.json_body();
2389 let service_ref = req_str(&body, "service")?;
2390 let service_name = service_name_from_ref(service_ref);
2391 let cluster_ref = opt_str(&body, "cluster");
2392 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2393 let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
2394
2395 let (snapshot, task_ids_to_stop) = {
2396 let mut accounts = self.state.write();
2397 let state = accounts
2398 .get_mut(&request.account_id)
2399 .ok_or_else(|| service_not_found(&service_name))?;
2400 let key = EcsState::service_key(&cluster_name, &service_name);
2401 let svc = state
2402 .services
2403 .get_mut(&key)
2404 .ok_or_else(|| service_not_found(&service_name))?;
2405 if !force && svc.desired_count > 0 {
2406 return Err(client_exception(
2407 "The service cannot be stopped while it is scaled above 0. \
2408 Either set desiredCount to 0 first, or pass force=true.",
2409 ));
2410 }
2411 svc.desired_count = 0;
2412 svc.status = "DRAINING".into();
2413 let service_tag = format!("ecs-svc/{}", service_name);
2414 let stop_ids: Vec<String> = state
2415 .tasks
2416 .iter()
2417 .filter(|(_, t)| {
2418 t.started_by.as_deref() == Some(service_tag.as_str())
2419 && t.cluster_name == cluster_name
2420 && t.last_status != "STOPPED"
2421 })
2422 .map(|(id, _)| id.clone())
2423 .collect();
2424 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2425 if cluster.active_services_count > 0 {
2426 cluster.active_services_count -= 1;
2427 }
2428 }
2429 let svc_snapshot = state.services.get(&key).unwrap().clone();
2430 state.services.remove(&key);
2431 state.push_event(crate::state::LifecycleEvent {
2432 at: Utc::now(),
2433 event_type: "ServiceDeleted".into(),
2434 task_arn: None,
2435 cluster_arn: Some(svc_snapshot.cluster_arn.clone()),
2436 last_status: Some("DRAINING".into()),
2437 detail: json!({"serviceArn": svc_snapshot.service_arn}),
2438 });
2439 (svc_snapshot, stop_ids)
2440 };
2441
2442 if let Some(rt) = &self.runtime {
2443 for id in &task_ids_to_stop {
2444 rt.stop_task(id, "ECS service deletion").await;
2445 let mut accounts = self.state.write();
2446 if let Some(state) = accounts.get_mut(&request.account_id) {
2447 if let Some(task) = state.tasks.get_mut(id) {
2448 task.desired_status = "STOPPED".into();
2449 task.stopping_at = Some(Utc::now());
2450 }
2451 }
2452 }
2453 }
2454
2455 Ok(AwsResponse::ok_json(
2456 json!({ "service": service_to_json(&snapshot) }),
2457 ))
2458 }
2459
2460 fn describe_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2461 let body = request.json_body();
2462 let cluster_ref = opt_str(&body, "cluster");
2463 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2464 let refs: Vec<String> = body
2465 .get("services")
2466 .and_then(|v| v.as_array())
2467 .map(|arr| {
2468 arr.iter()
2469 .filter_map(|v| v.as_str().map(String::from))
2470 .collect()
2471 })
2472 .unwrap_or_default();
2473
2474 let account = request.account_id.clone();
2475 let accounts = self.state.read();
2476 let mut found = Vec::new();
2477 let mut failures = Vec::new();
2478 let Some(state) = accounts.get(&account) else {
2479 for r in &refs {
2480 failures.push(json!({"arn": r, "reason": "MISSING"}));
2481 }
2482 return Ok(AwsResponse::ok_json(
2483 json!({"services": found, "failures": failures}),
2484 ));
2485 };
2486 for r in &refs {
2487 let name = service_name_from_ref(r);
2488 let key = EcsState::service_key(&cluster_name, &name);
2489 match state.services.get(&key) {
2490 Some(svc) => {
2491 let mut v = service_to_json(svc);
2492 recompute_service_counts(state, &name, &cluster_name, &mut v);
2494 found.push(v);
2495 }
2496 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
2497 }
2498 }
2499 Ok(AwsResponse::ok_json(json!({
2500 "services": found,
2501 "failures": failures,
2502 })))
2503 }
2504
2505 fn list_services(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2506 let body = request.json_body();
2507 let cluster_ref = opt_str(&body, "cluster");
2508 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2509 let launch_type = opt_str(&body, "launchType");
2510 let scheduling = opt_str(&body, "schedulingStrategy");
2511 let max_results = body
2512 .get("maxResults")
2513 .and_then(|v| v.as_i64())
2514 .filter(|n| (1..=100).contains(n))
2515 .map(|n| n as usize)
2516 .unwrap_or(100);
2517 let next_token = opt_str(&body, "nextToken").unwrap_or("");
2518
2519 let account = request.account_id.clone();
2520 let accounts = self.state.read();
2521 let mut arns: Vec<String> = match accounts.get(&account) {
2522 Some(state) => state
2523 .services
2524 .values()
2525 .filter(|s| s.cluster_name == cluster_name)
2526 .filter(|s| launch_type.is_none_or(|lt| s.launch_type == lt))
2527 .filter(|s| scheduling.is_none_or(|sc| s.scheduling_strategy == sc))
2528 .map(|s| s.service_arn.clone())
2529 .collect(),
2530 None => Vec::new(),
2531 };
2532 arns.sort();
2533 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
2534 let end = (start + max_results).min(arns.len());
2535 let page = arns[start..end].to_vec();
2536 let mut out = json!({"serviceArns": page});
2537 if end < arns.len() {
2538 out.as_object_mut()
2539 .unwrap()
2540 .insert("nextToken".into(), json!(end.to_string()));
2541 }
2542 Ok(AwsResponse::ok_json(out))
2543 }
2544
2545 fn list_services_by_namespace(
2546 &self,
2547 request: &AwsRequest,
2548 ) -> Result<AwsResponse, AwsServiceError> {
2549 let body = request.json_body();
2555 let namespace = req_str(&body, "namespace")?.to_string();
2556 let account = request.account_id.clone();
2557 let accounts = self.state.read();
2558 let mut arns: Vec<String> = match accounts.get(&account) {
2559 Some(state) => state
2560 .services
2561 .values()
2562 .filter(|s| {
2563 s.service_registries.iter().any(|r| {
2564 r.get("registryArn")
2565 .and_then(|v| v.as_str())
2566 .is_some_and(|arn| arn.contains(&namespace))
2567 })
2568 })
2569 .map(|s| s.service_arn.clone())
2570 .collect(),
2571 None => Vec::new(),
2572 };
2573 arns.sort();
2574 Ok(AwsResponse::ok_json(json!({"serviceArns": arns})))
2575 }
2576}
2577
2578fn validate_service_name(name: &str) -> Result<(), AwsServiceError> {
2579 if name.is_empty() || name.len() > 255 {
2580 return Err(invalid_parameter("Service name must be 1-255 characters"));
2581 }
2582 let ok = name
2583 .chars()
2584 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
2585 if !ok {
2586 return Err(invalid_parameter(
2587 "Service name may only contain letters, numbers, hyphens, and underscores",
2588 ));
2589 }
2590 Ok(())
2591}
2592
2593fn service_name_from_ref(input: &str) -> String {
2594 if let Some(rest) = input.rsplit('/').next() {
2595 return rest.to_string();
2596 }
2597 input.to_string()
2598}
2599
2600fn service_not_found(name: &str) -> AwsServiceError {
2601 AwsServiceError::aws_error(
2602 StatusCode::BAD_REQUEST,
2603 "ServiceNotFoundException",
2604 format!("The service could not be found: {name}"),
2605 )
2606}
2607
2608fn service_already_exists(name: &str) -> AwsServiceError {
2609 AwsServiceError::aws_error(
2610 StatusCode::BAD_REQUEST,
2611 "ServiceNotActiveException",
2612 format!("The service {name} already exists"),
2613 )
2614}
2615
2616fn spawn_service_tasks(
2621 state: &mut EcsState,
2622 service: &Service,
2623 count: i32,
2624 principal_arn: &str,
2625 launch_type: &str,
2626) -> Vec<String> {
2627 if count <= 0 {
2628 return Vec::new();
2629 }
2630 let Some(revisions) = state.task_definitions.get(&service.family) else {
2631 return Vec::new();
2632 };
2633 let Some(td) = revisions.get(&service.revision) else {
2634 return Vec::new();
2635 };
2636 let container_defs = td.container_definitions.clone();
2637 let cpu = td.cpu.clone();
2638 let memory = td.memory.clone();
2639 let task_role = td.task_role_arn.clone();
2640 let exec_role = td.execution_role_arn.clone();
2641 let cluster_name = service.cluster_name.clone();
2642 let cluster_arn = service.cluster_arn.clone();
2643 let td_arn = service.task_definition_arn.clone();
2644 let family = service.family.clone();
2645 let revision = service.revision;
2646 let service_tag = format!("ecs-svc/{}", service.service_name);
2647
2648 let mut ids = Vec::with_capacity(count as usize);
2649 for _ in 0..count {
2650 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
2651 let task_arn = state.task_arn(&cluster_name, &task_id);
2652 let containers: Vec<Container> = container_defs
2653 .iter()
2654 .map(|def| Container {
2655 container_arn: format!(
2656 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
2657 state.region,
2658 state.account_id,
2659 cluster_name,
2660 task_id,
2661 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
2662 ),
2663 name: def
2664 .get("name")
2665 .and_then(|v| v.as_str())
2666 .unwrap_or("app")
2667 .to_string(),
2668 image: def
2669 .get("image")
2670 .and_then(|v| v.as_str())
2671 .unwrap_or("")
2672 .to_string(),
2673 task_arn: task_arn.clone(),
2674 last_status: "PENDING".into(),
2675 exit_code: None,
2676 reason: None,
2677 runtime_id: None,
2678 essential: def
2679 .get("essential")
2680 .and_then(|v| v.as_bool())
2681 .unwrap_or(true),
2682 cpu: def
2683 .get("cpu")
2684 .and_then(|v| v.as_i64())
2685 .map(|n| n.to_string()),
2686 memory: def
2687 .get("memory")
2688 .and_then(|v| v.as_i64())
2689 .map(|n| n.to_string()),
2690 memory_reservation: def
2691 .get("memoryReservation")
2692 .and_then(|v| v.as_i64())
2693 .map(|n| n.to_string()),
2694 network_bindings: Vec::new(),
2695 network_interfaces: Vec::new(),
2696 health_status: Some("UNKNOWN".into()),
2697 managed_agents: None,
2698 })
2699 .collect();
2700 let awslogs = container_defs.iter().find_map(|def| {
2701 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
2702 let log_cfg = def.get("logConfiguration")?;
2703 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
2704 return None;
2705 }
2706 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
2707 Some(AwsLogsConfig {
2708 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
2709 stream_prefix: opts
2710 .get("awslogs-stream-prefix")
2711 .and_then(|v| v.as_str())
2712 .map(String::from),
2713 region: opts
2714 .get("awslogs-region")
2715 .and_then(|v| v.as_str())
2716 .unwrap_or(&state.region)
2717 .to_string(),
2718 container_name: name,
2719 })
2720 });
2721 let task = Task {
2722 task_arn: task_arn.clone(),
2723 task_id: task_id.clone(),
2724 cluster_arn: cluster_arn.clone(),
2725 cluster_name: cluster_name.clone(),
2726 task_definition_arn: td_arn.clone(),
2727 family: family.clone(),
2728 revision,
2729 last_status: "PENDING".into(),
2730 desired_status: "RUNNING".into(),
2731 launch_type: launch_type.into(),
2732 platform_version: Some("1.4.0".into()),
2733 cpu: cpu.clone(),
2734 memory: memory.clone(),
2735 containers,
2736 overrides: json!({}),
2737 started_by: Some(service_tag.clone()),
2738 group: Some(format!("service:{}", service.service_name)),
2739 connectivity: "CONNECTING".into(),
2740 stop_code: None,
2741 stopped_reason: None,
2742 created_at: Utc::now(),
2743 started_at: None,
2744 stopping_at: None,
2745 stopped_at: None,
2746 pull_started_at: None,
2747 pull_stopped_at: None,
2748 connectivity_at: None,
2749 started_by_ref_id: None,
2750 execution_role_arn: exec_role.clone(),
2751 task_role_arn: task_role.clone(),
2752 tags: Vec::new(),
2753 awslogs,
2754 captured_logs: String::new(),
2755 protection: None,
2756 };
2757 state.tasks.insert(task_id.clone(), task);
2758 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2759 cluster.pending_tasks_count += 1;
2760 }
2761 ids.push(task_id);
2762 }
2763 let _ = principal_arn;
2764 ids
2765}
2766
2767fn recompute_service_counts(
2768 state: &EcsState,
2769 service_name: &str,
2770 cluster_name: &str,
2771 service_json: &mut Value,
2772) {
2773 let service_tag = format!("ecs-svc/{}", service_name);
2774 let mut running = 0i32;
2775 let mut pending = 0i32;
2776 for t in state.tasks.values() {
2777 if t.started_by.as_deref() == Some(service_tag.as_str()) && t.cluster_name == cluster_name {
2778 match t.last_status.as_str() {
2779 "RUNNING" => running += 1,
2780 "PENDING" | "PROVISIONING" => pending += 1,
2781 _ => {}
2782 }
2783 }
2784 }
2785 if let Some(map) = service_json.as_object_mut() {
2786 map.insert("runningCount".into(), json!(running));
2787 map.insert("pendingCount".into(), json!(pending));
2788 }
2789}
2790
2791fn service_to_json(svc: &Service) -> Value {
2792 let mut map = serde_json::Map::new();
2793 map.insert("serviceArn".into(), json!(svc.service_arn));
2794 map.insert("serviceName".into(), json!(svc.service_name));
2795 map.insert("clusterArn".into(), json!(svc.cluster_arn));
2796 map.insert("status".into(), json!(svc.status));
2797 map.insert("desiredCount".into(), json!(svc.desired_count));
2798 map.insert("runningCount".into(), json!(svc.running_count));
2799 map.insert("pendingCount".into(), json!(svc.pending_count));
2800 map.insert("launchType".into(), json!(svc.launch_type));
2801 map.insert("schedulingStrategy".into(), json!(svc.scheduling_strategy));
2802 map.insert("taskDefinition".into(), json!(svc.task_definition_arn));
2803 map.insert(
2804 "deploymentController".into(),
2805 json!({"type": svc.deployment_controller}),
2806 );
2807 let mut deployment_cfg = serde_json::Map::new();
2808 if let Some(n) = svc.minimum_healthy_percent {
2809 deployment_cfg.insert("minimumHealthyPercent".into(), json!(n));
2810 }
2811 if let Some(n) = svc.maximum_percent {
2812 deployment_cfg.insert("maximumPercent".into(), json!(n));
2813 }
2814 if let Some(ref cb) = svc.circuit_breaker {
2815 deployment_cfg.insert(
2816 "deploymentCircuitBreaker".into(),
2817 json!({"enable": cb.enable, "rollback": cb.rollback}),
2818 );
2819 }
2820 if !deployment_cfg.is_empty() {
2821 map.insert(
2822 "deploymentConfiguration".into(),
2823 Value::Object(deployment_cfg),
2824 );
2825 }
2826 map.insert(
2827 "deployments".into(),
2828 Value::Array(svc.deployments.iter().map(deployment_to_json).collect()),
2829 );
2830 map.insert(
2831 "loadBalancers".into(),
2832 Value::Array(svc.load_balancers.clone()),
2833 );
2834 map.insert(
2835 "serviceRegistries".into(),
2836 Value::Array(svc.service_registries.clone()),
2837 );
2838 map.insert(
2839 "placementConstraints".into(),
2840 Value::Array(svc.placement_constraints.clone()),
2841 );
2842 map.insert(
2843 "placementStrategy".into(),
2844 Value::Array(svc.placement_strategy.clone()),
2845 );
2846 if let Some(ref v) = svc.network_configuration {
2847 map.insert("networkConfiguration".into(), v.clone());
2848 }
2849 if let Some(ref v) = svc.role_arn {
2850 map.insert("roleArn".into(), json!(v));
2851 }
2852 if let Some(ref v) = svc.created_by {
2853 map.insert("createdBy".into(), json!(v));
2854 }
2855 map.insert("createdAt".into(), json!(svc.created_at.timestamp()));
2856 Value::Object(map)
2857}
2858
2859fn deployment_to_json(d: &Deployment) -> Value {
2860 json!({
2861 "id": d.deployment_id,
2862 "status": d.status,
2863 "taskDefinition": d.task_definition_arn,
2864 "desiredCount": d.desired_count,
2865 "pendingCount": d.pending_count,
2866 "runningCount": d.running_count,
2867 "failedTasks": d.failed_tasks,
2868 "createdAt": d.created_at.timestamp(),
2869 "updatedAt": d.updated_at.timestamp(),
2870 "launchType": d.launch_type,
2871 "rolloutState": d.rollout_state,
2872 "rolloutStateReason": d.rollout_state_reason,
2873 })
2874}
2875
2876impl EcsService {
2879 fn register_container_instance(
2880 &self,
2881 request: &AwsRequest,
2882 ) -> Result<AwsResponse, AwsServiceError> {
2883 let body = request.json_body();
2884 let cluster_ref = opt_str(&body, "cluster");
2885 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2886 let ec2_id = opt_str(&body, "instanceIdentityDocument")
2887 .and_then(|s| serde_json::from_str::<Value>(s).ok())
2888 .and_then(|v| {
2889 v.get("instanceId")
2890 .and_then(|x| x.as_str())
2891 .map(String::from)
2892 });
2893 let tags = parse_tags(&body);
2894
2895 let account = request.account_id.clone();
2896 let mut accounts = self.state.write();
2897 let state = accounts.get_or_create(&account);
2898 let cluster_arn = state
2899 .clusters
2900 .get(&cluster_name)
2901 .map(|c| c.cluster_arn.clone())
2902 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
2903 let uuid = uuid::Uuid::new_v4().to_string();
2904 let ci_arn = state.container_instance_arn(&cluster_name, &uuid);
2905 let key = format!("{}/{}", cluster_name, uuid);
2906 let ci = ContainerInstance {
2907 container_instance_arn: ci_arn.clone(),
2908 ec2_instance_id: ec2_id,
2909 cluster_name: cluster_name.clone(),
2910 cluster_arn,
2911 status: "ACTIVE".into(),
2912 version: 1,
2913 version_info: body.get("versionInfo").cloned(),
2914 agent_connected: true,
2915 agent_update_status: None,
2916 remaining_resources: body
2917 .get("totalResources")
2918 .and_then(|v| v.as_array())
2919 .cloned()
2920 .unwrap_or_default(),
2921 registered_resources: body
2922 .get("totalResources")
2923 .and_then(|v| v.as_array())
2924 .cloned()
2925 .unwrap_or_default(),
2926 running_tasks_count: 0,
2927 pending_tasks_count: 0,
2928 registered_at: Utc::now(),
2929 attributes: body
2930 .get("attributes")
2931 .and_then(|v| v.as_array())
2932 .map(|arr| {
2933 arr.iter()
2934 .filter_map(|a| {
2935 let name = a.get("name").and_then(|v| v.as_str())?;
2936 Some(AttributeRef {
2937 name: name.to_string(),
2938 value: a.get("value").and_then(|v| v.as_str()).map(String::from),
2939 target_type: a
2940 .get("targetType")
2941 .and_then(|v| v.as_str())
2942 .map(String::from),
2943 target_id: a
2944 .get("targetId")
2945 .and_then(|v| v.as_str())
2946 .map(String::from),
2947 })
2948 })
2949 .collect()
2950 })
2951 .unwrap_or_default(),
2952 tags,
2953 capacity_provider_name: None,
2954 health_status: None,
2955 };
2956 state.container_instances.insert(key, ci.clone());
2957 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2958 cluster.registered_container_instances_count += 1;
2959 }
2960 Ok(AwsResponse::ok_json(json!({
2961 "containerInstance": container_instance_to_json(&ci),
2962 })))
2963 }
2964
2965 fn deregister_container_instance(
2966 &self,
2967 request: &AwsRequest,
2968 ) -> Result<AwsResponse, AwsServiceError> {
2969 let body = request.json_body();
2970 let ci_ref = req_str(&body, "containerInstance")?.to_string();
2971 let cluster_ref = opt_str(&body, "cluster");
2972 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
2973 let id = container_instance_id_from_ref(&ci_ref);
2974 let key = format!("{}/{}", cluster_name, id);
2975
2976 let account = request.account_id.clone();
2977 let mut accounts = self.state.write();
2978 let state = accounts
2979 .get_mut(&account)
2980 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
2981 let mut ci = state
2982 .container_instances
2983 .remove(&key)
2984 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
2985 ci.status = "INACTIVE".into();
2986 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
2987 if cluster.registered_container_instances_count > 0 {
2988 cluster.registered_container_instances_count -= 1;
2989 }
2990 }
2991 Ok(AwsResponse::ok_json(json!({
2992 "containerInstance": container_instance_to_json(&ci),
2993 })))
2994 }
2995
2996 fn describe_container_instances(
2997 &self,
2998 request: &AwsRequest,
2999 ) -> Result<AwsResponse, AwsServiceError> {
3000 let body = request.json_body();
3001 let cluster_ref = opt_str(&body, "cluster");
3002 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3003 let refs: Vec<String> = body
3004 .get("containerInstances")
3005 .and_then(|v| v.as_array())
3006 .map(|arr| {
3007 arr.iter()
3008 .filter_map(|v| v.as_str().map(String::from))
3009 .collect()
3010 })
3011 .unwrap_or_default();
3012
3013 let accounts = self.state.read();
3014 let mut found = Vec::new();
3015 let mut failures = Vec::new();
3016 if let Some(state) = accounts.get(&request.account_id) {
3017 for r in &refs {
3018 let id = container_instance_id_from_ref(r);
3019 let key = format!("{}/{}", cluster_name, id);
3020 match state.container_instances.get(&key) {
3021 Some(ci) => found.push(container_instance_to_json(ci)),
3022 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3023 }
3024 }
3025 } else {
3026 for r in &refs {
3027 failures.push(json!({"arn": r, "reason": "MISSING"}));
3028 }
3029 }
3030 Ok(AwsResponse::ok_json(json!({
3031 "containerInstances": found,
3032 "failures": failures,
3033 })))
3034 }
3035
3036 fn list_container_instances(
3037 &self,
3038 request: &AwsRequest,
3039 ) -> Result<AwsResponse, AwsServiceError> {
3040 let body = request.json_body();
3041 let cluster_ref = opt_str(&body, "cluster");
3042 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3043 let status_filter = opt_str(&body, "status");
3044 let max_results = body
3045 .get("maxResults")
3046 .and_then(|v| v.as_i64())
3047 .filter(|n| (1..=100).contains(n))
3048 .map(|n| n as usize)
3049 .unwrap_or(100);
3050 let next_token = opt_str(&body, "nextToken").unwrap_or("");
3051
3052 let accounts = self.state.read();
3053 let mut arns: Vec<String> = match accounts.get(&request.account_id) {
3054 Some(state) => state
3055 .container_instances
3056 .values()
3057 .filter(|ci| ci.cluster_name == cluster_name)
3058 .filter(|ci| status_filter.is_none_or(|s| ci.status == s))
3059 .map(|ci| ci.container_instance_arn.clone())
3060 .collect(),
3061 None => Vec::new(),
3062 };
3063 arns.sort();
3064 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
3065 let end = (start + max_results).min(arns.len());
3066 let page = arns[start..end].to_vec();
3067 let mut out = json!({"containerInstanceArns": page});
3068 if end < arns.len() {
3069 out.as_object_mut()
3070 .unwrap()
3071 .insert("nextToken".into(), json!(end.to_string()));
3072 }
3073 Ok(AwsResponse::ok_json(out))
3074 }
3075
3076 fn update_container_agent(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3077 let body = request.json_body();
3078 let ci_ref = req_str(&body, "containerInstance")?.to_string();
3079 let cluster_ref = opt_str(&body, "cluster");
3080 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3081 let id = container_instance_id_from_ref(&ci_ref);
3082 let key = format!("{}/{}", cluster_name, id);
3083 let mut accounts = self.state.write();
3084 let state = accounts
3085 .get_mut(&request.account_id)
3086 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3087 let ci = state
3088 .container_instances
3089 .get_mut(&key)
3090 .ok_or_else(|| container_instance_not_found(&ci_ref))?;
3091 ci.agent_update_status = Some("UPDATED".into());
3092 Ok(AwsResponse::ok_json(json!({
3093 "containerInstance": container_instance_to_json(ci),
3094 })))
3095 }
3096
3097 fn update_container_instances_state(
3098 &self,
3099 request: &AwsRequest,
3100 ) -> Result<AwsResponse, AwsServiceError> {
3101 let body = request.json_body();
3102 let status = req_str(&body, "status")?.to_string();
3103 let cluster_ref = opt_str(&body, "cluster");
3104 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3105 let refs: Vec<String> = body
3106 .get("containerInstances")
3107 .and_then(|v| v.as_array())
3108 .map(|arr| {
3109 arr.iter()
3110 .filter_map(|v| v.as_str().map(String::from))
3111 .collect()
3112 })
3113 .unwrap_or_default();
3114 let mut accounts = self.state.write();
3115 let state = accounts
3116 .get_mut(&request.account_id)
3117 .ok_or_else(|| client_exception("account not found"))?;
3118 let mut found = Vec::new();
3119 let mut failures = Vec::new();
3120 for r in &refs {
3121 let id = container_instance_id_from_ref(r);
3122 let key = format!("{}/{}", cluster_name, id);
3123 match state.container_instances.get_mut(&key) {
3124 Some(ci) => {
3125 ci.status = status.clone();
3126 found.push(container_instance_to_json(ci));
3127 }
3128 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3129 }
3130 }
3131 Ok(AwsResponse::ok_json(json!({
3132 "containerInstances": found,
3133 "failures": failures,
3134 })))
3135 }
3136
3137 fn put_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3138 let body = request.json_body();
3139 let cluster_ref = opt_str(&body, "cluster");
3140 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3141 let attrs = body
3142 .get("attributes")
3143 .and_then(|v| v.as_array())
3144 .cloned()
3145 .unwrap_or_default();
3146
3147 let mut accounts = self.state.write();
3148 let state = accounts.get_or_create(&request.account_id);
3149 let mut stored = Vec::new();
3150 for a in &attrs {
3151 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3152 continue;
3153 };
3154 let target_type = a
3155 .get("targetType")
3156 .and_then(|v| v.as_str())
3157 .unwrap_or("container-instance")
3158 .to_string();
3159 let target_id = a
3160 .get("targetId")
3161 .and_then(|v| v.as_str())
3162 .unwrap_or("")
3163 .to_string();
3164 let value = a.get("value").and_then(|v| v.as_str()).map(String::from);
3165 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3166 let attr = Attribute {
3167 cluster_name: cluster_name.clone(),
3168 target_type: target_type.clone(),
3169 target_id: target_id.clone(),
3170 name: name.to_string(),
3171 value: value.clone(),
3172 };
3173 state.attributes.insert(key, attr);
3174 stored.push(json!({
3175 "name": name,
3176 "value": value,
3177 "targetType": target_type,
3178 "targetId": target_id,
3179 }));
3180 }
3181 Ok(AwsResponse::ok_json(json!({"attributes": stored})))
3182 }
3183
3184 fn delete_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3185 let body = request.json_body();
3186 let cluster_ref = opt_str(&body, "cluster");
3187 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3188 let attrs = body
3189 .get("attributes")
3190 .and_then(|v| v.as_array())
3191 .cloned()
3192 .unwrap_or_default();
3193 let mut accounts = self.state.write();
3194 let state = accounts.get_or_create(&request.account_id);
3195 let mut deleted = Vec::new();
3196 for a in &attrs {
3197 let Some(name) = a.get("name").and_then(|v| v.as_str()) else {
3198 continue;
3199 };
3200 let target_id = a.get("targetId").and_then(|v| v.as_str()).unwrap_or("");
3201 let key = format!("{}/{}/{}", cluster_name, target_id, name);
3202 if let Some(attr) = state.attributes.remove(&key) {
3203 deleted.push(json!({
3204 "name": attr.name,
3205 "value": attr.value,
3206 "targetType": attr.target_type,
3207 "targetId": attr.target_id,
3208 }));
3209 }
3210 }
3211 Ok(AwsResponse::ok_json(json!({"attributes": deleted})))
3212 }
3213
3214 fn list_attributes(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3215 let body = request.json_body();
3216 let cluster_ref = opt_str(&body, "cluster");
3217 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3218 let target_type = req_str(&body, "targetType")?.to_string();
3219 let attr_name = opt_str(&body, "attributeName");
3220 let attr_value = opt_str(&body, "attributeValue");
3221
3222 let accounts = self.state.read();
3223 let attrs: Vec<Value> = match accounts.get(&request.account_id) {
3224 Some(state) => state
3225 .attributes
3226 .values()
3227 .filter(|a| a.cluster_name == cluster_name)
3228 .filter(|a| a.target_type == target_type)
3229 .filter(|a| attr_name.is_none_or(|n| a.name == n))
3230 .filter(|a| attr_value.is_none_or(|v| a.value.as_deref() == Some(v)))
3231 .map(|a| {
3232 json!({
3233 "name": a.name,
3234 "value": a.value,
3235 "targetType": a.target_type,
3236 "targetId": a.target_id,
3237 })
3238 })
3239 .collect(),
3240 None => Vec::new(),
3241 };
3242 Ok(AwsResponse::ok_json(json!({"attributes": attrs})))
3243 }
3244
3245 fn create_capacity_provider(
3246 &self,
3247 request: &AwsRequest,
3248 ) -> Result<AwsResponse, AwsServiceError> {
3249 let body = request.json_body();
3250 let name = req_str(&body, "name")?.to_string();
3251 if name.starts_with("aws") || name.starts_with("ecs") {
3252 return Err(invalid_parameter(format!(
3253 "Capacity provider name cannot begin with 'aws' or 'ecs': {name}"
3254 )));
3255 }
3256 let auto_scaling_group_provider = body.get("autoScalingGroupProvider").cloned();
3257 let tags = parse_tags(&body);
3258
3259 let mut accounts = self.state.write();
3260 let state = accounts.get_or_create(&request.account_id);
3261 if state.capacity_providers.contains_key(&name) {
3262 return Err(client_exception(format!(
3263 "Capacity provider already exists: {name}"
3264 )));
3265 }
3266 let arn = format!(
3267 "arn:aws:ecs:{}:{}:capacity-provider/{}",
3268 state.region, state.account_id, name
3269 );
3270 let cp = CapacityProvider {
3271 name: name.clone(),
3272 arn,
3273 status: "ACTIVE".into(),
3274 auto_scaling_group_provider,
3275 update_status: None,
3276 update_status_reason: None,
3277 created_at: Utc::now(),
3278 tags,
3279 };
3280 state.capacity_providers.insert(name.clone(), cp.clone());
3281 Ok(AwsResponse::ok_json(json!({
3282 "capacityProvider": capacity_provider_to_json(&cp),
3283 })))
3284 }
3285
3286 fn delete_capacity_provider(
3287 &self,
3288 request: &AwsRequest,
3289 ) -> Result<AwsResponse, AwsServiceError> {
3290 let body = request.json_body();
3291 let input = req_str(&body, "capacityProvider")?.to_string();
3292 let name = capacity_provider_name_from_ref(&input);
3293 let mut accounts = self.state.write();
3294 let state = accounts
3295 .get_mut(&request.account_id)
3296 .ok_or_else(|| capacity_provider_not_found(&name))?;
3297 let mut cp = state
3298 .capacity_providers
3299 .remove(&name)
3300 .ok_or_else(|| capacity_provider_not_found(&name))?;
3301 cp.status = "INACTIVE".into();
3302 Ok(AwsResponse::ok_json(json!({
3303 "capacityProvider": capacity_provider_to_json(&cp),
3304 })))
3305 }
3306
3307 fn describe_capacity_providers(
3308 &self,
3309 request: &AwsRequest,
3310 ) -> Result<AwsResponse, AwsServiceError> {
3311 let body = request.json_body();
3312 let names: Vec<String> = body
3313 .get("capacityProviders")
3314 .and_then(|v| v.as_array())
3315 .map(|arr| {
3316 arr.iter()
3317 .filter_map(|v| v.as_str().map(capacity_provider_name_from_ref))
3318 .collect()
3319 })
3320 .unwrap_or_default();
3321 let accounts = self.state.read();
3322 let mut found = Vec::new();
3323 let mut failures = Vec::new();
3324 if let Some(state) = accounts.get(&request.account_id) {
3325 if names.is_empty() {
3326 for cp in state.capacity_providers.values() {
3327 found.push(capacity_provider_to_json(cp));
3328 }
3329 } else {
3330 for n in &names {
3331 match state.capacity_providers.get(n) {
3332 Some(cp) => found.push(capacity_provider_to_json(cp)),
3333 None => failures.push(json!({"arn": n, "reason": "MISSING"})),
3334 }
3335 }
3336 }
3337 }
3338 Ok(AwsResponse::ok_json(json!({
3339 "capacityProviders": found,
3340 "failures": failures,
3341 })))
3342 }
3343
3344 fn update_capacity_provider(
3345 &self,
3346 request: &AwsRequest,
3347 ) -> Result<AwsResponse, AwsServiceError> {
3348 let body = request.json_body();
3349 let input = req_str(&body, "name")?.to_string();
3350 let name = capacity_provider_name_from_ref(&input);
3351 let asg = body.get("autoScalingGroupProvider").cloned();
3352 let mut accounts = self.state.write();
3353 let state = accounts
3354 .get_mut(&request.account_id)
3355 .ok_or_else(|| capacity_provider_not_found(&name))?;
3356 let cp = state
3357 .capacity_providers
3358 .get_mut(&name)
3359 .ok_or_else(|| capacity_provider_not_found(&name))?;
3360 if let Some(v) = asg {
3361 cp.auto_scaling_group_provider = Some(v);
3362 }
3363 cp.update_status = Some("UPDATE_COMPLETE".into());
3364 Ok(AwsResponse::ok_json(json!({
3365 "capacityProvider": capacity_provider_to_json(cp),
3366 })))
3367 }
3368
3369 fn get_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3370 let body = request.json_body();
3371 let refs: Vec<String> = body
3372 .get("tasks")
3373 .and_then(|v| v.as_array())
3374 .map(|arr| {
3375 arr.iter()
3376 .filter_map(|v| v.as_str().map(String::from))
3377 .collect()
3378 })
3379 .unwrap_or_default();
3380 let accounts = self.state.read();
3381 let mut protections = Vec::new();
3382 let mut failures = Vec::new();
3383 if let Some(state) = accounts.get(&request.account_id) {
3384 for r in &refs {
3385 let id = task_id_from_ref(r);
3386 match state.tasks.get(&id) {
3387 Some(t) => protections.push(task_protection_json(t)),
3388 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3389 }
3390 }
3391 }
3392 Ok(AwsResponse::ok_json(json!({
3393 "protectedTasks": protections,
3394 "failures": failures,
3395 })))
3396 }
3397
3398 fn update_task_protection(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3399 let body = request.json_body();
3400 let refs: Vec<String> = body
3401 .get("tasks")
3402 .and_then(|v| v.as_array())
3403 .map(|arr| {
3404 arr.iter()
3405 .filter_map(|v| v.as_str().map(String::from))
3406 .collect()
3407 })
3408 .unwrap_or_default();
3409 let protect = body
3410 .get("protectionEnabled")
3411 .and_then(|v| v.as_bool())
3412 .unwrap_or(false);
3413 let expires_in_minutes = body
3414 .get("expiresInMinutes")
3415 .and_then(|v| v.as_i64())
3416 .unwrap_or(2880);
3417 let expiration = if protect {
3418 Some(Utc::now() + chrono::Duration::minutes(expires_in_minutes))
3419 } else {
3420 None
3421 };
3422
3423 let mut accounts = self.state.write();
3424 let state = accounts
3425 .get_mut(&request.account_id)
3426 .ok_or_else(|| client_exception("account not found"))?;
3427 let mut protections = Vec::new();
3428 let mut failures = Vec::new();
3429 for r in &refs {
3430 let id = task_id_from_ref(r);
3431 match state.tasks.get_mut(&id) {
3432 Some(t) => {
3433 t.protection = Some(crate::state::TaskProtection {
3434 enabled: protect,
3435 expiration,
3436 });
3437 protections.push(task_protection_json(t));
3438 }
3439 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3440 }
3441 }
3442 Ok(AwsResponse::ok_json(json!({
3443 "protectedTasks": protections,
3444 "failures": failures,
3445 })))
3446 }
3447
3448 fn create_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3449 let body = request.json_body();
3450 let service_ref = req_str(&body, "service")?;
3451 let service_name = service_name_from_ref(service_ref);
3452 let cluster_ref = opt_str(&body, "cluster");
3453 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3454 let task_definition = req_str(&body, "taskDefinition")?.to_string();
3455 let external_id = opt_str(&body, "externalId").map(String::from);
3456 let launch_type = opt_str(&body, "launchType").map(String::from);
3457 let platform_version = opt_str(&body, "platformVersion").map(String::from);
3458 let scale = body.get("scale").cloned();
3459 let tags = parse_tags(&body);
3460 let load_balancers = body
3461 .get("loadBalancers")
3462 .and_then(|v| v.as_array())
3463 .cloned()
3464 .unwrap_or_default();
3465 let service_registries = body
3466 .get("serviceRegistries")
3467 .and_then(|v| v.as_array())
3468 .cloned()
3469 .unwrap_or_default();
3470 let capacity_provider_strategy = body
3471 .get("capacityProviderStrategy")
3472 .and_then(|v| v.as_array())
3473 .cloned()
3474 .unwrap_or_default();
3475
3476 let mut accounts = self.state.write();
3477 let state = accounts
3478 .get_mut(&request.account_id)
3479 .ok_or_else(|| service_not_found(&service_name))?;
3480 let service_key = EcsState::service_key(&cluster_name, &service_name);
3481 let svc = state
3482 .services
3483 .get(&service_key)
3484 .ok_or_else(|| service_not_found(&service_name))?;
3485 if svc.deployment_controller != "EXTERNAL" {
3486 return Err(client_exception(
3487 "CreateTaskSet requires the service to be created with \
3488 deploymentController.type = EXTERNAL",
3489 ));
3490 }
3491 let ts_id = format!("ecs-svc-{}", uuid::Uuid::new_v4().simple());
3492 let task_set = TaskSet {
3493 task_set_id: ts_id.clone(),
3494 task_set_arn: format!(
3495 "arn:aws:ecs:{}:{}:task-set/{}/{}/{}",
3496 state.region, state.account_id, cluster_name, service_name, ts_id
3497 ),
3498 service_arn: svc.service_arn.clone(),
3499 cluster_arn: svc.cluster_arn.clone(),
3500 service_name: service_name.clone(),
3501 cluster_name: cluster_name.clone(),
3502 external_id,
3503 status: "ACTIVE".into(),
3504 task_definition,
3505 computed_desired_count: 0,
3506 pending_count: 0,
3507 running_count: 0,
3508 launch_type,
3509 platform_version,
3510 scale,
3511 stability_status: "STABILIZING".into(),
3512 created_at: Utc::now(),
3513 updated_at: Utc::now(),
3514 load_balancers,
3515 service_registries,
3516 capacity_provider_strategy,
3517 tags,
3518 };
3519 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3520 state.task_sets.insert(key, task_set.clone());
3521 Ok(AwsResponse::ok_json(json!({
3522 "taskSet": task_set_to_json(&task_set),
3523 })))
3524 }
3525
3526 fn update_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3527 let body = request.json_body();
3528 let ts_ref = req_str(&body, "taskSet")?.to_string();
3529 let service_ref = req_str(&body, "service")?;
3530 let service_name = service_name_from_ref(service_ref);
3531 let cluster_ref = opt_str(&body, "cluster");
3532 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3533 let scale = body.get("scale").cloned();
3534
3535 let mut accounts = self.state.write();
3536 let state = accounts
3537 .get_mut(&request.account_id)
3538 .ok_or_else(|| client_exception("task set not found"))?;
3539 let ts_id = task_set_id_from_ref(&ts_ref);
3540 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3541 let ts = state
3542 .task_sets
3543 .get_mut(&key)
3544 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3545 if let Some(v) = scale {
3546 ts.scale = Some(v);
3547 }
3548 ts.updated_at = Utc::now();
3549 Ok(AwsResponse::ok_json(json!({
3550 "taskSet": task_set_to_json(ts),
3551 })))
3552 }
3553
3554 fn delete_task_set(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3555 let body = request.json_body();
3556 let ts_ref = req_str(&body, "taskSet")?.to_string();
3557 let service_ref = req_str(&body, "service")?;
3558 let service_name = service_name_from_ref(service_ref);
3559 let cluster_ref = opt_str(&body, "cluster");
3560 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3561 let ts_id = task_set_id_from_ref(&ts_ref);
3562 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3563
3564 let mut accounts = self.state.write();
3565 let state = accounts
3566 .get_mut(&request.account_id)
3567 .ok_or_else(|| client_exception("task set not found"))?;
3568 let mut ts = state
3569 .task_sets
3570 .remove(&key)
3571 .ok_or_else(|| client_exception(format!("task set not found: {}", ts_ref)))?;
3572 ts.status = "DRAINING".into();
3573 Ok(AwsResponse::ok_json(json!({
3574 "taskSet": task_set_to_json(&ts),
3575 })))
3576 }
3577
3578 fn describe_task_sets(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3579 let body = request.json_body();
3580 let service_ref = req_str(&body, "service")?;
3581 let service_name = service_name_from_ref(service_ref);
3582 let cluster_ref = opt_str(&body, "cluster");
3583 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3584 let filter_refs: Vec<String> = body
3585 .get("taskSets")
3586 .and_then(|v| v.as_array())
3587 .map(|arr| {
3588 arr.iter()
3589 .filter_map(|v| v.as_str().map(String::from))
3590 .collect()
3591 })
3592 .unwrap_or_default();
3593
3594 let accounts = self.state.read();
3595 let mut found = Vec::new();
3596 let mut failures = Vec::new();
3597 if let Some(state) = accounts.get(&request.account_id) {
3598 if filter_refs.is_empty() {
3599 for ts in state.task_sets.values() {
3600 if ts.cluster_name == cluster_name && ts.service_name == service_name {
3601 found.push(task_set_to_json(ts));
3602 }
3603 }
3604 } else {
3605 for r in &filter_refs {
3606 let id = task_set_id_from_ref(r);
3607 let key = format!("{}/{}/{}", cluster_name, service_name, id);
3608 match state.task_sets.get(&key) {
3609 Some(ts) => found.push(task_set_to_json(ts)),
3610 None => failures.push(json!({"arn": r, "reason": "MISSING"})),
3611 }
3612 }
3613 }
3614 }
3615 Ok(AwsResponse::ok_json(json!({
3616 "taskSets": found,
3617 "failures": failures,
3618 })))
3619 }
3620
3621 fn update_service_primary_task_set(
3622 &self,
3623 request: &AwsRequest,
3624 ) -> Result<AwsResponse, AwsServiceError> {
3625 let body = request.json_body();
3626 let ts_ref = req_str(&body, "primaryTaskSet")?.to_string();
3627 let service_ref = req_str(&body, "service")?;
3628 let service_name = service_name_from_ref(service_ref);
3629 let cluster_ref = opt_str(&body, "cluster");
3630 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3631 let ts_id = task_set_id_from_ref(&ts_ref);
3632 let key = format!("{}/{}/{}", cluster_name, service_name, ts_id);
3633 let mut accounts = self.state.write();
3634 let state = accounts
3635 .get_mut(&request.account_id)
3636 .ok_or_else(|| client_exception("task set not found"))?;
3637 if !state.task_sets.contains_key(&key) {
3638 return Err(client_exception(format!("task set not found: {}", ts_ref)));
3639 }
3640 for ts in state.task_sets.values_mut() {
3644 if ts.service_name == service_name
3645 && ts.cluster_name == cluster_name
3646 && ts.status == "PRIMARY"
3647 && ts.task_set_id != ts_id
3648 {
3649 ts.status = "ACTIVE".into();
3650 ts.updated_at = Utc::now();
3651 }
3652 }
3653 let ts = state.task_sets.get_mut(&key).unwrap();
3654 ts.status = "PRIMARY".into();
3655 ts.updated_at = Utc::now();
3656 Ok(AwsResponse::ok_json(json!({
3657 "taskSet": task_set_to_json(ts),
3658 })))
3659 }
3660
3661 async fn execute_command(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3662 let body = request.json_body();
3663 let task_ref = req_str(&body, "task")?.to_string();
3664 let command = req_str(&body, "command")?.to_string();
3665 let interactive = body
3666 .get("interactive")
3667 .and_then(|v| v.as_bool())
3668 .unwrap_or(false);
3669
3670 let container_id = {
3672 let accounts = self.state.read();
3673 let state = accounts
3674 .get(&request.account_id)
3675 .ok_or_else(|| task_not_found(&task_ref))?;
3676 let id = task_id_from_ref(&task_ref);
3677 state
3678 .tasks
3679 .get(&id)
3680 .and_then(|t| t.containers.first())
3681 .and_then(|c| c.runtime_id.clone())
3682 };
3683
3684 let session_id = format!("ecs-execute-command-{}", uuid::Uuid::new_v4());
3685 if let (Some(id), Some(_rt)) = (container_id.clone(), self.runtime.as_ref()) {
3686 let out = tokio::process::Command::new("docker")
3691 .args(["exec", &id, "sh", "-c", &command])
3692 .output()
3693 .await
3694 .map_err(|e| client_exception(format!("docker exec failed: {e}")))?;
3695 tracing::info!(
3696 task = %task_ref,
3697 exit = out.status.code().unwrap_or(-1),
3698 "ExecuteCommand via docker exec"
3699 );
3700 }
3701
3702 Ok(AwsResponse::ok_json(json!({
3703 "clusterArn": opt_str(&body, "cluster").unwrap_or(""),
3704 "containerArn": container_id.unwrap_or_default(),
3705 "containerName": opt_str(&body, "container").unwrap_or(""),
3706 "interactive": interactive,
3707 "session": {
3708 "sessionId": session_id,
3709 "streamUrl": "",
3710 "tokenValue": "",
3711 },
3712 "taskArn": task_ref,
3713 })))
3714 }
3715
3716 fn submit_container_state_change(
3717 &self,
3718 request: &AwsRequest,
3719 ) -> Result<AwsResponse, AwsServiceError> {
3720 let body = request.json_body();
3725 let task_ref = opt_str(&body, "task").unwrap_or("");
3726 let container_name = opt_str(&body, "containerName").unwrap_or("");
3727 let status = opt_str(&body, "status").map(String::from);
3728 let exit_code = body.get("exitCode").and_then(|v| v.as_i64());
3729 let reason = opt_str(&body, "reason").map(String::from);
3730
3731 if !task_ref.is_empty() {
3732 let mut accounts = self.state.write();
3733 if let Some(state) = accounts.get_mut(&request.account_id) {
3734 let id = task_id_from_ref(task_ref);
3735 if let Some(task) = state.tasks.get_mut(&id) {
3736 if let Some(container) = task
3737 .containers
3738 .iter_mut()
3739 .find(|c| c.name == container_name)
3740 {
3741 if let Some(s) = status {
3742 container.last_status = s;
3743 }
3744 if let Some(code) = exit_code {
3745 container.exit_code = Some(code);
3746 }
3747 if let Some(r) = reason {
3748 container.reason = Some(r);
3749 }
3750 }
3751 }
3752 }
3753 }
3754 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3755 }
3756
3757 fn submit_task_state_change(
3758 &self,
3759 request: &AwsRequest,
3760 ) -> Result<AwsResponse, AwsServiceError> {
3761 let body = request.json_body();
3762 let task_ref = opt_str(&body, "task").unwrap_or("");
3763 let status = opt_str(&body, "status").map(String::from);
3764 if !task_ref.is_empty() {
3765 let mut accounts = self.state.write();
3766 if let Some(state) = accounts.get_mut(&request.account_id) {
3767 let id = task_id_from_ref(task_ref);
3768 if let Some(task) = state.tasks.get_mut(&id) {
3769 if let Some(s) = status {
3770 task.last_status = s;
3771 }
3772 }
3773 }
3774 }
3775 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3776 }
3777
3778 fn submit_attachment_state_changes(
3779 &self,
3780 _request: &AwsRequest,
3781 ) -> Result<AwsResponse, AwsServiceError> {
3782 Ok(AwsResponse::ok_json(json!({"acknowledgment": "OK"})))
3785 }
3786
3787 fn discover_poll_endpoint(
3788 &self,
3789 _request: &AwsRequest,
3790 ) -> Result<AwsResponse, AwsServiceError> {
3791 let accounts = self.state.read();
3795 let endpoint = format!("https://ecs.{}.amazonaws.com/", accounts.region());
3796 Ok(AwsResponse::ok_json(json!({
3797 "endpoint": endpoint,
3798 "telemetryEndpoint": endpoint,
3799 "serviceConnectEndpoint": endpoint,
3800 })))
3801 }
3802
3803 fn stop_service_deployment(
3804 &self,
3805 request: &AwsRequest,
3806 ) -> Result<AwsResponse, AwsServiceError> {
3807 let body = request.json_body();
3808 let deployment_ref = req_str(&body, "serviceDeploymentArn")?.to_string();
3809 let mut accounts = self.state.write();
3810 let state = accounts
3811 .get_mut(&request.account_id)
3812 .ok_or_else(|| client_exception("service deployment not found"))?;
3813 for svc in state.services.values_mut() {
3814 for d in svc.deployments.iter_mut() {
3815 if deployment_ref.contains(&d.deployment_id) {
3816 d.status = "STOPPED".into();
3817 d.rollout_state = "FAILED".into();
3818 d.rollout_state_reason = Some("StopServiceDeployment requested".into());
3819 d.updated_at = Utc::now();
3820 return Ok(AwsResponse::ok_json(json!({
3821 "serviceDeployment": deployment_to_json(d),
3822 })));
3823 }
3824 }
3825 }
3826 Err(client_exception(format!(
3827 "service deployment not found: {deployment_ref}"
3828 )))
3829 }
3830
3831 fn list_service_deployments(
3832 &self,
3833 request: &AwsRequest,
3834 ) -> Result<AwsResponse, AwsServiceError> {
3835 let body = request.json_body();
3836 let service_ref = req_str(&body, "service")?;
3837 let service_name = service_name_from_ref(service_ref);
3838 let cluster_ref = opt_str(&body, "cluster");
3839 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
3840 let accounts = self.state.read();
3841 let mut deployments: Vec<Value> = Vec::new();
3842 if let Some(state) = accounts.get(&request.account_id) {
3843 let key = EcsState::service_key(&cluster_name, &service_name);
3844 if let Some(svc) = state.services.get(&key) {
3845 for d in &svc.deployments {
3846 deployments.push(json!({
3847 "serviceDeploymentArn": format!("{}/{}", svc.service_arn, d.deployment_id),
3848 "serviceArn": svc.service_arn,
3849 "clusterArn": svc.cluster_arn,
3850 "status": d.status,
3851 "createdAt": d.created_at.timestamp(),
3852 "startedAt": d.created_at.timestamp(),
3853 "finishedAt": null,
3854 }));
3855 }
3856 }
3857 }
3858 Ok(AwsResponse::ok_json(json!({
3859 "serviceDeployments": deployments,
3860 })))
3861 }
3862
3863 fn describe_service_deployments(
3864 &self,
3865 request: &AwsRequest,
3866 ) -> Result<AwsResponse, AwsServiceError> {
3867 let body = request.json_body();
3868 let refs: Vec<String> = body
3869 .get("serviceDeploymentArns")
3870 .and_then(|v| v.as_array())
3871 .map(|arr| {
3872 arr.iter()
3873 .filter_map(|v| v.as_str().map(String::from))
3874 .collect()
3875 })
3876 .unwrap_or_default();
3877 let accounts = self.state.read();
3878 let mut found = Vec::new();
3879 let mut failures = Vec::new();
3880 if let Some(state) = accounts.get(&request.account_id) {
3881 'next_ref: for r in &refs {
3882 for svc in state.services.values() {
3883 for d in &svc.deployments {
3884 if r.contains(&d.deployment_id) {
3885 found.push(json!({
3886 "serviceDeploymentArn": r,
3887 "serviceArn": svc.service_arn,
3888 "clusterArn": svc.cluster_arn,
3889 "status": d.status,
3890 "createdAt": d.created_at.timestamp(),
3891 "startedAt": d.created_at.timestamp(),
3892 "finishedAt": null,
3893 "deploymentConfiguration": {
3894 "minimumHealthyPercent": svc.minimum_healthy_percent,
3895 "maximumPercent": svc.maximum_percent,
3896 },
3897 "sourceServiceRevisions": [],
3898 "targetServiceRevision": {
3899 "arn": d.task_definition_arn,
3900 "requestedTaskCount": d.desired_count,
3901 "runningTaskCount": d.running_count,
3902 "pendingTaskCount": d.pending_count,
3903 "failedTasks": d.failed_tasks,
3904 },
3905 }));
3906 continue 'next_ref;
3907 }
3908 }
3909 }
3910 failures.push(json!({"arn": r, "reason": "MISSING"}));
3911 }
3912 }
3913 Ok(AwsResponse::ok_json(json!({
3914 "serviceDeployments": found,
3915 "failures": failures,
3916 })))
3917 }
3918
3919 fn describe_service_revisions(
3920 &self,
3921 request: &AwsRequest,
3922 ) -> Result<AwsResponse, AwsServiceError> {
3923 let body = request.json_body();
3924 let refs: Vec<String> = body
3925 .get("serviceRevisionArns")
3926 .and_then(|v| v.as_array())
3927 .map(|arr| {
3928 arr.iter()
3929 .filter_map(|v| v.as_str().map(String::from))
3930 .collect()
3931 })
3932 .unwrap_or_default();
3933 Ok(AwsResponse::ok_json(json!({
3934 "serviceRevisions": [],
3935 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
3936 })))
3937 }
3938}
3939
3940fn container_instance_id_from_ref(input: &str) -> String {
3941 input.rsplit('/').next().unwrap_or(input).to_string()
3942}
3943
3944fn container_instance_not_found(input: &str) -> AwsServiceError {
3945 AwsServiceError::aws_error(
3946 StatusCode::BAD_REQUEST,
3947 "ClientException",
3948 format!("Container instance not found: {input}"),
3949 )
3950}
3951
3952fn capacity_provider_name_from_ref(input: &str) -> String {
3953 input.rsplit('/').next().unwrap_or(input).to_string()
3954}
3955
3956fn capacity_provider_not_found(name: &str) -> AwsServiceError {
3957 AwsServiceError::aws_error(
3958 StatusCode::BAD_REQUEST,
3959 "ClientException",
3960 format!("Capacity provider not found: {name}"),
3961 )
3962}
3963
3964fn task_set_id_from_ref(input: &str) -> String {
3965 input.rsplit('/').next().unwrap_or(input).to_string()
3966}
3967
3968fn container_instance_to_json(ci: &ContainerInstance) -> Value {
3969 json!({
3970 "containerInstanceArn": ci.container_instance_arn,
3971 "ec2InstanceId": ci.ec2_instance_id,
3972 "status": ci.status,
3973 "version": ci.version,
3974 "versionInfo": ci.version_info,
3975 "agentConnected": ci.agent_connected,
3976 "agentUpdateStatus": ci.agent_update_status,
3977 "remainingResources": ci.remaining_resources,
3978 "registeredResources": ci.registered_resources,
3979 "runningTasksCount": ci.running_tasks_count,
3980 "pendingTasksCount": ci.pending_tasks_count,
3981 "registeredAt": ci.registered_at.timestamp(),
3982 "attributes": ci.attributes.iter().map(|a| json!({
3983 "name": a.name,
3984 "value": a.value,
3985 "targetType": a.target_type,
3986 "targetId": a.target_id,
3987 })).collect::<Vec<_>>(),
3988 "tags": tags_json(&ci.tags),
3989 "capacityProviderName": ci.capacity_provider_name,
3990 "healthStatus": ci.health_status,
3991 })
3992}
3993
3994fn capacity_provider_to_json(cp: &CapacityProvider) -> Value {
3995 json!({
3996 "name": cp.name,
3997 "capacityProviderArn": cp.arn,
3998 "status": cp.status,
3999 "autoScalingGroupProvider": cp.auto_scaling_group_provider,
4000 "updateStatus": cp.update_status,
4001 "updateStatusReason": cp.update_status_reason,
4002 "tags": tags_json(&cp.tags),
4003 })
4004}
4005
4006fn task_set_to_json(ts: &TaskSet) -> Value {
4007 json!({
4008 "id": ts.task_set_id,
4009 "taskSetArn": ts.task_set_arn,
4010 "serviceArn": ts.service_arn,
4011 "clusterArn": ts.cluster_arn,
4012 "externalId": ts.external_id,
4013 "status": ts.status,
4014 "taskDefinition": ts.task_definition,
4015 "computedDesiredCount": ts.computed_desired_count,
4016 "pendingCount": ts.pending_count,
4017 "runningCount": ts.running_count,
4018 "launchType": ts.launch_type,
4019 "platformVersion": ts.platform_version,
4020 "scale": ts.scale,
4021 "stabilityStatus": ts.stability_status,
4022 "createdAt": ts.created_at.timestamp(),
4023 "updatedAt": ts.updated_at.timestamp(),
4024 "loadBalancers": ts.load_balancers,
4025 "serviceRegistries": ts.service_registries,
4026 "capacityProviderStrategy": ts.capacity_provider_strategy,
4027 "tags": tags_json(&ts.tags),
4028 })
4029}
4030
4031fn task_protection_json(task: &Task) -> Value {
4032 let p = task.protection.as_ref();
4033 json!({
4034 "taskArn": task.task_arn,
4035 "protectionEnabled": p.map(|p| p.enabled).unwrap_or(false),
4036 "expirationDate": p.and_then(|p| p.expiration).map(|e| e.timestamp()),
4037 })
4038}
4039
4040#[cfg(test)]
4041mod tests {
4042 use super::*;
4043
4044 #[test]
4045 fn parse_family_revision_with_revision() {
4046 assert_eq!(parse_family_revision("web:3"), ("web".to_string(), Some(3)));
4047 }
4048
4049 #[test]
4050 fn parse_family_revision_without_revision() {
4051 assert_eq!(parse_family_revision("web"), ("web".to_string(), None));
4052 }
4053
4054 #[test]
4055 fn parse_family_revision_non_numeric_treated_as_no_revision() {
4056 assert_eq!(
4057 parse_family_revision("web:latest"),
4058 ("web:latest".to_string(), None)
4059 );
4060 }
4061
4062 #[test]
4063 fn decode_ecs_arn_cluster() {
4064 let (account, rtype, tail) =
4065 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:cluster/prod").unwrap();
4066 assert_eq!(account, "111122223333");
4067 assert_eq!(rtype, "cluster");
4068 assert_eq!(tail, "prod");
4069 }
4070
4071 #[test]
4072 fn decode_ecs_arn_task_definition() {
4073 let (account, rtype, tail) =
4074 decode_ecs_arn("arn:aws:ecs:us-east-1:111122223333:task-definition/web:5").unwrap();
4075 assert_eq!(account, "111122223333");
4076 assert_eq!(rtype, "task-definition");
4077 assert_eq!(tail, "web:5");
4078 }
4079
4080 #[test]
4081 fn decode_ecs_arn_rejects_non_ecs() {
4082 assert!(decode_ecs_arn("arn:aws:s3:::bucket").is_err());
4083 }
4084
4085 #[test]
4086 fn validate_family_name_accepts_hyphen_underscore() {
4087 assert!(validate_family_name("web_server-2").is_ok());
4088 }
4089
4090 #[test]
4091 fn validate_family_name_rejects_empty() {
4092 assert!(validate_family_name("").is_err());
4093 }
4094
4095 #[test]
4096 fn validate_family_name_rejects_slash() {
4097 assert!(validate_family_name("web/server").is_err());
4098 }
4099
4100 #[test]
4101 fn resolve_task_definition_ref_bare_family() {
4102 let (account, family, rev) = resolve_task_definition_ref("web").unwrap();
4103 assert_eq!(account, None);
4104 assert_eq!(family, "web");
4105 assert_eq!(rev, None);
4106 }
4107
4108 #[test]
4109 fn resolve_task_definition_ref_family_revision() {
4110 let (account, family, rev) = resolve_task_definition_ref("web:3").unwrap();
4111 assert_eq!(account, None);
4112 assert_eq!(family, "web");
4113 assert_eq!(rev, Some(3));
4114 }
4115
4116 #[test]
4117 fn resolve_task_definition_ref_full_arn() {
4118 let (account, family, rev) =
4119 resolve_task_definition_ref("arn:aws:ecs:us-east-1:111122223333:task-definition/web:3")
4120 .unwrap();
4121 assert_eq!(account, Some("111122223333".to_string()));
4122 assert_eq!(family, "web");
4123 assert_eq!(rev, Some(3));
4124 }
4125
4126 #[test]
4127 fn merge_tags_replaces_existing_value() {
4128 let mut current = vec![TagEntry {
4129 key: "env".into(),
4130 value: "dev".into(),
4131 }];
4132 merge_tags(
4133 &mut current,
4134 vec![TagEntry {
4135 key: "env".into(),
4136 value: "prod".into(),
4137 }],
4138 );
4139 assert_eq!(current.len(), 1);
4140 assert_eq!(current[0].value, "prod");
4141 }
4142
4143 #[test]
4144 fn merge_tags_adds_new() {
4145 let mut current = vec![TagEntry {
4146 key: "env".into(),
4147 value: "dev".into(),
4148 }];
4149 merge_tags(
4150 &mut current,
4151 vec![TagEntry {
4152 key: "team".into(),
4153 value: "platform".into(),
4154 }],
4155 );
4156 assert_eq!(current.len(), 2);
4157 }
4158
4159 #[test]
4160 fn parse_tags_reads_lowercase_keys() {
4161 let body = json!({
4162 "tags": [
4163 {"key": "env", "value": "prod"},
4164 {"key": "team", "value": "platform"},
4165 ]
4166 });
4167 let tags = parse_tags(&body);
4168 assert_eq!(tags.len(), 2);
4169 assert_eq!(tags[0].key, "env");
4170 assert_eq!(tags[0].value, "prod");
4171 }
4172
4173 #[test]
4174 fn matches_filter_respects_none() {
4175 assert!(matches_filter(None, "anything"));
4176 assert!(matches_filter(Some("x"), "x"));
4177 assert!(!matches_filter(Some("x"), "y"));
4178 }
4179}