1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14 Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15 Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16 TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20 "CreateCluster",
21 "DescribeClusters",
22 "DeleteCluster",
23 "ListClusters",
24 "UpdateCluster",
25 "UpdateClusterSettings",
26 "PutClusterCapacityProviders",
27 "RegisterTaskDefinition",
28 "DescribeTaskDefinition",
29 "DeregisterTaskDefinition",
30 "DeleteTaskDefinitions",
31 "ListTaskDefinitions",
32 "ListTaskDefinitionFamilies",
33 "TagResource",
34 "UntagResource",
35 "ListTagsForResource",
36 "PutAccountSetting",
37 "PutAccountSettingDefault",
38 "DeleteAccountSetting",
39 "ListAccountSettings",
40 "RunTask",
41 "StartTask",
42 "StopTask",
43 "DescribeTasks",
44 "ListTasks",
45 "CreateService",
46 "UpdateService",
47 "DeleteService",
48 "DescribeServices",
49 "ListServices",
50 "ListServicesByNamespace",
51 "RegisterContainerInstance",
52 "DeregisterContainerInstance",
53 "DescribeContainerInstances",
54 "ListContainerInstances",
55 "UpdateContainerAgent",
56 "UpdateContainerInstancesState",
57 "PutAttributes",
58 "DeleteAttributes",
59 "ListAttributes",
60 "CreateCapacityProvider",
61 "DeleteCapacityProvider",
62 "DescribeCapacityProviders",
63 "UpdateCapacityProvider",
64 "GetTaskProtection",
65 "UpdateTaskProtection",
66 "CreateTaskSet",
67 "UpdateTaskSet",
68 "DeleteTaskSet",
69 "DescribeTaskSets",
70 "UpdateServicePrimaryTaskSet",
71 "ExecuteCommand",
72 "SubmitContainerStateChange",
73 "SubmitTaskStateChange",
74 "SubmitAttachmentStateChanges",
75 "DiscoverPollEndpoint",
76 "StopServiceDeployment",
77 "ListServiceDeployments",
78 "DescribeServiceDeployments",
79 "DescribeServiceRevisions",
80 "RegisterDaemonTaskDefinition",
81 "DescribeDaemonTaskDefinition",
82 "DeleteDaemonTaskDefinition",
83 "ListDaemonTaskDefinitions",
84 "CreateDaemon",
85 "DescribeDaemon",
86 "UpdateDaemon",
87 "DeleteDaemon",
88 "ListDaemons",
89 "DescribeDaemonDeployments",
90 "ListDaemonDeployments",
91 "DescribeDaemonRevisions",
92 "CreateExpressGatewayService",
93 "DescribeExpressGatewayService",
94 "UpdateExpressGatewayService",
95 "DeleteExpressGatewayService",
96];
97
98pub struct EcsService {
99 state: SharedEcsState,
100 snapshot_store: Option<Arc<dyn SnapshotStore>>,
101 snapshot_lock: Arc<AsyncMutex<()>>,
102 runtime: Option<Arc<crate::runtime::EcsRuntime>>,
103 role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
104}
105
106impl EcsService {
107 pub fn new(state: SharedEcsState) -> Self {
108 Self {
109 state,
110 snapshot_store: None,
111 snapshot_lock: Arc::new(AsyncMutex::new(())),
112 runtime: None,
113 role_trust_validator: None,
114 }
115 }
116
117 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
118 self.snapshot_store = Some(store);
119 self
120 }
121
122 pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
123 self.runtime = Some(runtime);
124 self
125 }
126
127 pub fn with_role_trust_validator(
128 mut self,
129 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
130 ) -> Self {
131 self.role_trust_validator = Some(validator);
132 self
133 }
134
135 fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
136 let Some(ref validator) = self.role_trust_validator else {
137 return Ok(());
138 };
139 if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
140 return Err(AwsServiceError::aws_error(
141 StatusCode::BAD_REQUEST,
142 "InvalidParameterException",
143 err.to_string(),
144 ));
145 }
146 Ok(())
147 }
148
149 pub fn state_handle(&self) -> &SharedEcsState {
150 &self.state
151 }
152
153 async fn save_snapshot(&self) {
154 let Some(store) = self.snapshot_store.clone() else {
155 return;
156 };
157 let _guard = self.snapshot_lock.lock().await;
158 let snapshot = EcsSnapshot {
159 schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
160 accounts: Some(self.state.read().clone()),
161 };
162 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
163 let bytes = serde_json::to_vec(&snapshot)
164 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
165 store.save(&bytes)
166 })
167 .await;
168 match join {
169 Ok(Ok(())) => {}
170 Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
171 Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
172 }
173 }
174
175 pub async fn reconcile_persisted_tasks(&self) {
188 let touched = {
189 let mut accounts = self.state.write();
190 let mut touched_tasks = 0usize;
191 let mut touched_services = 0usize;
192 let now = chrono::Utc::now();
193 for (_, state) in accounts.iter_mut() {
194 for task in state.tasks.values_mut() {
195 if task.last_status != "STOPPED" {
196 task.last_status = "STOPPED".to_string();
197 task.desired_status = "STOPPED".to_string();
198 task.stop_code = Some("EssentialContainerExited".to_string());
199 task.stopped_reason =
200 Some("fakecloud restart - backing container lost".to_string());
201 if task.stopping_at.is_none() {
202 task.stopping_at = Some(now);
203 }
204 if task.stopped_at.is_none() {
205 task.stopped_at = Some(now);
206 }
207 for container in task.containers.iter_mut() {
208 container.last_status = "STOPPED".to_string();
209 }
210 touched_tasks += 1;
211 }
212 }
213 for service in state.services.values_mut() {
214 if service.running_count != 0 || service.pending_count != 0 {
215 service.running_count = 0;
216 service.pending_count = 0;
217 for deployment in service.deployments.iter_mut() {
218 deployment.running_count = 0;
219 deployment.pending_count = 0;
220 }
221 touched_services += 1;
222 }
223 }
224 }
225 (touched_tasks, touched_services)
226 };
227 if touched.0 + touched.1 > 0 {
228 tracing::info!(
229 tasks = touched.0,
230 services = touched.1,
231 "reconciled persisted ecs tasks / service counts after restart",
232 );
233 self.save_snapshot().await;
234 }
235 }
236}
237
238#[async_trait]
239impl AwsService for EcsService {
240 fn service_name(&self) -> &str {
241 "ecs"
242 }
243
244 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
245 let mutates = is_mutating(request.action.as_str());
246 let result = match request.action.as_str() {
247 "CreateCluster" => self.create_cluster(&request),
248 "DescribeClusters" => self.describe_clusters(&request),
249 "DeleteCluster" => self.delete_cluster(&request),
250 "ListClusters" => self.list_clusters(&request),
251 "UpdateCluster" => self.update_cluster(&request),
252 "UpdateClusterSettings" => self.update_cluster_settings(&request),
253 "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
254 "RegisterTaskDefinition" => self.register_task_definition(&request),
255 "DescribeTaskDefinition" => self.describe_task_definition(&request),
256 "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
257 "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
258 "ListTaskDefinitions" => self.list_task_definitions(&request),
259 "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
260 "TagResource" => self.tag_resource(&request),
261 "UntagResource" => self.untag_resource(&request),
262 "ListTagsForResource" => self.list_tags_for_resource(&request),
263 "PutAccountSetting" => self.put_account_setting(&request),
264 "PutAccountSettingDefault" => self.put_account_setting_default(&request),
265 "DeleteAccountSetting" => self.delete_account_setting(&request),
266 "ListAccountSettings" => self.list_account_settings(&request),
267 "RunTask" => self.run_task(&request),
268 "StartTask" => self.start_task(&request),
269 "StopTask" => self.stop_task(&request).await,
270 "DescribeTasks" => self.describe_tasks(&request),
271 "ListTasks" => self.list_tasks(&request),
272 "CreateService" => self.create_service(&request),
273 "UpdateService" => self.update_service(&request),
274 "DeleteService" => self.delete_service(&request).await,
275 "DescribeServices" => self.describe_services(&request),
276 "ListServices" => self.list_services(&request),
277 "ListServicesByNamespace" => self.list_services_by_namespace(&request),
278 "RegisterContainerInstance" => self.register_container_instance(&request),
279 "DeregisterContainerInstance" => self.deregister_container_instance(&request),
280 "DescribeContainerInstances" => self.describe_container_instances(&request),
281 "ListContainerInstances" => self.list_container_instances(&request),
282 "UpdateContainerAgent" => self.update_container_agent(&request),
283 "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
284 "PutAttributes" => self.put_attributes(&request),
285 "DeleteAttributes" => self.delete_attributes(&request),
286 "ListAttributes" => self.list_attributes(&request),
287 "CreateCapacityProvider" => self.create_capacity_provider(&request),
288 "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
289 "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
290 "UpdateCapacityProvider" => self.update_capacity_provider(&request),
291 "GetTaskProtection" => self.get_task_protection(&request),
292 "UpdateTaskProtection" => self.update_task_protection(&request),
293 "CreateTaskSet" => self.create_task_set(&request),
294 "UpdateTaskSet" => self.update_task_set(&request),
295 "DeleteTaskSet" => self.delete_task_set(&request),
296 "DescribeTaskSets" => self.describe_task_sets(&request),
297 "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
298 "ExecuteCommand" => self.execute_command(&request).await,
299 "SubmitContainerStateChange" => self.submit_container_state_change(&request),
300 "SubmitTaskStateChange" => self.submit_task_state_change(&request),
301 "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
302 "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
303 "StopServiceDeployment" => self.stop_service_deployment(&request),
304 "ListServiceDeployments" => self.list_service_deployments(&request),
305 "DescribeServiceDeployments" => self.describe_service_deployments(&request),
306 "DescribeServiceRevisions" => self.describe_service_revisions(&request),
307 "RegisterDaemonTaskDefinition" => self.register_daemon_task_definition(&request),
308 "DescribeDaemonTaskDefinition" => self.describe_daemon_task_definition(&request),
309 "DeleteDaemonTaskDefinition" => self.delete_daemon_task_definition(&request),
310 "ListDaemonTaskDefinitions" => self.list_daemon_task_definitions(&request),
311 "CreateDaemon" => self.create_daemon(&request),
312 "DescribeDaemon" => self.describe_daemon(&request),
313 "UpdateDaemon" => self.update_daemon(&request),
314 "DeleteDaemon" => self.delete_daemon(&request),
315 "ListDaemons" => self.list_daemons(&request),
316 "DescribeDaemonDeployments" => self.describe_daemon_deployments(&request),
317 "ListDaemonDeployments" => self.list_daemon_deployments(&request),
318 "DescribeDaemonRevisions" => self.describe_daemon_revisions(&request),
319 "CreateExpressGatewayService" => self.create_express_gateway_service(&request),
320 "DescribeExpressGatewayService" => self.describe_express_gateway_service(&request),
321 "UpdateExpressGatewayService" => self.update_express_gateway_service(&request),
322 "DeleteExpressGatewayService" => self.delete_express_gateway_service(&request),
323 _ => Err(AwsServiceError::action_not_implemented(
324 "ecs",
325 &request.action,
326 )),
327 };
328 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
329 self.save_snapshot().await;
330 }
331 result
332 }
333
334 fn supported_actions(&self) -> &[&str] {
335 SUPPORTED_ACTIONS
336 }
337}
338
339#[path = "service_clusters.rs"]
356mod service_clusters;
357
358#[path = "service_task_definitions.rs"]
359mod service_task_definitions;
360
361#[path = "service_tagging.rs"]
362mod service_tagging;
363
364#[path = "service_account_settings.rs"]
365mod service_account_settings;
366
367#[path = "service_tasks.rs"]
368mod service_tasks;
369
370#[path = "service_services_resource.rs"]
371mod service_services_resource;
372
373#[path = "service_container_instances_etc.rs"]
374mod service_container_instances_etc;
375
376#[path = "service_daemons.rs"]
377mod service_daemons;
378
379#[path = "service_express_gateway.rs"]
380mod service_express_gateway;
381
382#[path = "helpers.rs"]
383mod helpers;
384pub(crate) use helpers::*;
385
386#[cfg(test)]
387#[path = "service_tests.rs"]
388mod tests;