Skip to main content

fakecloud_ecs/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14    Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15    Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16    TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "CreateCluster",
21    "DescribeClusters",
22    "DeleteCluster",
23    "ListClusters",
24    "UpdateCluster",
25    "UpdateClusterSettings",
26    "PutClusterCapacityProviders",
27    "RegisterTaskDefinition",
28    "DescribeTaskDefinition",
29    "DeregisterTaskDefinition",
30    "DeleteTaskDefinitions",
31    "ListTaskDefinitions",
32    "ListTaskDefinitionFamilies",
33    "TagResource",
34    "UntagResource",
35    "ListTagsForResource",
36    "PutAccountSetting",
37    "PutAccountSettingDefault",
38    "DeleteAccountSetting",
39    "ListAccountSettings",
40    "RunTask",
41    "StartTask",
42    "StopTask",
43    "DescribeTasks",
44    "ListTasks",
45    "CreateService",
46    "UpdateService",
47    "DeleteService",
48    "DescribeServices",
49    "ListServices",
50    "ListServicesByNamespace",
51    "RegisterContainerInstance",
52    "DeregisterContainerInstance",
53    "DescribeContainerInstances",
54    "ListContainerInstances",
55    "UpdateContainerAgent",
56    "UpdateContainerInstancesState",
57    "PutAttributes",
58    "DeleteAttributes",
59    "ListAttributes",
60    "CreateCapacityProvider",
61    "DeleteCapacityProvider",
62    "DescribeCapacityProviders",
63    "UpdateCapacityProvider",
64    "GetTaskProtection",
65    "UpdateTaskProtection",
66    "CreateTaskSet",
67    "UpdateTaskSet",
68    "DeleteTaskSet",
69    "DescribeTaskSets",
70    "UpdateServicePrimaryTaskSet",
71    "ExecuteCommand",
72    "SubmitContainerStateChange",
73    "SubmitTaskStateChange",
74    "SubmitAttachmentStateChanges",
75    "DiscoverPollEndpoint",
76    "StopServiceDeployment",
77    "ContinueServiceDeployment",
78    "ListServiceDeployments",
79    "DescribeServiceDeployments",
80    "DescribeServiceRevisions",
81    "RegisterDaemonTaskDefinition",
82    "DescribeDaemonTaskDefinition",
83    "DeleteDaemonTaskDefinition",
84    "ListDaemonTaskDefinitions",
85    "CreateDaemon",
86    "DescribeDaemon",
87    "UpdateDaemon",
88    "DeleteDaemon",
89    "ListDaemons",
90    "DescribeDaemonDeployments",
91    "ListDaemonDeployments",
92    "DescribeDaemonRevisions",
93    "CreateExpressGatewayService",
94    "DescribeExpressGatewayService",
95    "UpdateExpressGatewayService",
96    "DeleteExpressGatewayService",
97];
98
99pub struct EcsService {
100    state: SharedEcsState,
101    snapshot_store: Option<Arc<dyn SnapshotStore>>,
102    snapshot_lock: Arc<AsyncMutex<()>>,
103    runtime: Option<Arc<crate::runtime::EcsRuntime>>,
104    role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
105}
106
107impl EcsService {
108    pub fn new(state: SharedEcsState) -> Self {
109        Self {
110            state,
111            snapshot_store: None,
112            snapshot_lock: Arc::new(AsyncMutex::new(())),
113            runtime: None,
114            role_trust_validator: None,
115        }
116    }
117
118    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119        self.snapshot_store = Some(store);
120        self
121    }
122
123    pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
124        self.runtime = Some(runtime);
125        self
126    }
127
128    pub fn with_role_trust_validator(
129        mut self,
130        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
131    ) -> Self {
132        self.role_trust_validator = Some(validator);
133        self
134    }
135
136    fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
137        let Some(ref validator) = self.role_trust_validator else {
138            return Ok(());
139        };
140        if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
141            return Err(AwsServiceError::aws_error(
142                StatusCode::BAD_REQUEST,
143                "InvalidParameterException",
144                err.to_string(),
145            ));
146        }
147        Ok(())
148    }
149
150    pub fn state_handle(&self) -> &SharedEcsState {
151        &self.state
152    }
153
154    async fn save_snapshot(&self) {
155        let Some(store) = self.snapshot_store.clone() else {
156            return;
157        };
158        let _guard = self.snapshot_lock.lock().await;
159        let snapshot = EcsSnapshot {
160            schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
161            accounts: Some(self.state.read().clone()),
162        };
163        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
164            let bytes = serde_json::to_vec(&snapshot)
165                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
166            store.save(&bytes)
167        })
168        .await;
169        match join {
170            Ok(Ok(())) => {}
171            Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
172            Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
173        }
174    }
175
176    /// Reconcile persisted task state with reality after a fakecloud
177    /// restart. Same bug class as RDS #1338: tasks were persisted with
178    /// `lastStatus = RUNNING` but their docker containers are gone, so
179    /// DescribeTasks reported phantom-running tasks and `docker exec`
180    /// (for ECS Exec) would fail.
181    ///
182    /// Mark every non-STOPPED task as STOPPED with a `stoppedReason`
183    /// that explains the restart, and reset service `runningCount` /
184    /// `pendingCount` to zero. The service scheduler ticker then
185    /// reconciles desiredCount and launches fresh tasks. Standalone
186    /// `RunTask` tasks aren't auto-respawned because we don't have
187    /// enough context to replay them safely; callers re-invoke.
188    pub async fn reconcile_persisted_tasks(&self) {
189        let touched = {
190            let mut accounts = self.state.write();
191            let mut touched_tasks = 0usize;
192            let mut touched_services = 0usize;
193            let now = chrono::Utc::now();
194            for (_, state) in accounts.iter_mut() {
195                for task in state.tasks.values_mut() {
196                    if task.last_status != "STOPPED" {
197                        task.last_status = "STOPPED".to_string();
198                        task.desired_status = "STOPPED".to_string();
199                        task.stop_code = Some("EssentialContainerExited".to_string());
200                        task.stopped_reason =
201                            Some("fakecloud restart - backing container lost".to_string());
202                        if task.stopping_at.is_none() {
203                            task.stopping_at = Some(now);
204                        }
205                        if task.stopped_at.is_none() {
206                            task.stopped_at = Some(now);
207                        }
208                        for container in task.containers.iter_mut() {
209                            container.last_status = "STOPPED".to_string();
210                        }
211                        touched_tasks += 1;
212                    }
213                }
214                for service in state.services.values_mut() {
215                    if service.running_count != 0 || service.pending_count != 0 {
216                        service.running_count = 0;
217                        service.pending_count = 0;
218                        for deployment in service.deployments.iter_mut() {
219                            deployment.running_count = 0;
220                            deployment.pending_count = 0;
221                        }
222                        touched_services += 1;
223                    }
224                }
225            }
226            (touched_tasks, touched_services)
227        };
228        if touched.0 + touched.1 > 0 {
229            tracing::info!(
230                tasks = touched.0,
231                services = touched.1,
232                "reconciled persisted ecs tasks / service counts after restart",
233            );
234            self.save_snapshot().await;
235        }
236    }
237}
238
239#[async_trait]
240impl AwsService for EcsService {
241    fn service_name(&self) -> &str {
242        "ecs"
243    }
244
245    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
246        let mutates = is_mutating(request.action.as_str());
247        let result = match request.action.as_str() {
248            "CreateCluster" => self.create_cluster(&request),
249            "DescribeClusters" => self.describe_clusters(&request),
250            "DeleteCluster" => self.delete_cluster(&request),
251            "ListClusters" => self.list_clusters(&request),
252            "UpdateCluster" => self.update_cluster(&request),
253            "UpdateClusterSettings" => self.update_cluster_settings(&request),
254            "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
255            "RegisterTaskDefinition" => self.register_task_definition(&request),
256            "DescribeTaskDefinition" => self.describe_task_definition(&request),
257            "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
258            "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
259            "ListTaskDefinitions" => self.list_task_definitions(&request),
260            "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
261            "TagResource" => self.tag_resource(&request),
262            "UntagResource" => self.untag_resource(&request),
263            "ListTagsForResource" => self.list_tags_for_resource(&request),
264            "PutAccountSetting" => self.put_account_setting(&request),
265            "PutAccountSettingDefault" => self.put_account_setting_default(&request),
266            "DeleteAccountSetting" => self.delete_account_setting(&request),
267            "ListAccountSettings" => self.list_account_settings(&request),
268            "RunTask" => self.run_task(&request),
269            "StartTask" => self.start_task(&request),
270            "StopTask" => self.stop_task(&request).await,
271            "DescribeTasks" => self.describe_tasks(&request),
272            "ListTasks" => self.list_tasks(&request),
273            "CreateService" => self.create_service(&request),
274            "UpdateService" => self.update_service(&request),
275            "DeleteService" => self.delete_service(&request).await,
276            "DescribeServices" => self.describe_services(&request),
277            "ListServices" => self.list_services(&request),
278            "ListServicesByNamespace" => self.list_services_by_namespace(&request),
279            "RegisterContainerInstance" => self.register_container_instance(&request),
280            "DeregisterContainerInstance" => self.deregister_container_instance(&request),
281            "DescribeContainerInstances" => self.describe_container_instances(&request),
282            "ListContainerInstances" => self.list_container_instances(&request),
283            "UpdateContainerAgent" => self.update_container_agent(&request),
284            "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
285            "PutAttributes" => self.put_attributes(&request),
286            "DeleteAttributes" => self.delete_attributes(&request),
287            "ListAttributes" => self.list_attributes(&request),
288            "CreateCapacityProvider" => self.create_capacity_provider(&request),
289            "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
290            "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
291            "UpdateCapacityProvider" => self.update_capacity_provider(&request),
292            "GetTaskProtection" => self.get_task_protection(&request),
293            "UpdateTaskProtection" => self.update_task_protection(&request),
294            "CreateTaskSet" => self.create_task_set(&request),
295            "UpdateTaskSet" => self.update_task_set(&request),
296            "DeleteTaskSet" => self.delete_task_set(&request),
297            "DescribeTaskSets" => self.describe_task_sets(&request),
298            "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
299            "ExecuteCommand" => self.execute_command(&request).await,
300            "SubmitContainerStateChange" => self.submit_container_state_change(&request),
301            "SubmitTaskStateChange" => self.submit_task_state_change(&request),
302            "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
303            "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
304            "StopServiceDeployment" => self.stop_service_deployment(&request),
305            "ContinueServiceDeployment" => self.continue_service_deployment(&request),
306            "ListServiceDeployments" => self.list_service_deployments(&request),
307            "DescribeServiceDeployments" => self.describe_service_deployments(&request),
308            "DescribeServiceRevisions" => self.describe_service_revisions(&request),
309            "RegisterDaemonTaskDefinition" => self.register_daemon_task_definition(&request),
310            "DescribeDaemonTaskDefinition" => self.describe_daemon_task_definition(&request),
311            "DeleteDaemonTaskDefinition" => self.delete_daemon_task_definition(&request),
312            "ListDaemonTaskDefinitions" => self.list_daemon_task_definitions(&request),
313            "CreateDaemon" => self.create_daemon(&request),
314            "DescribeDaemon" => self.describe_daemon(&request),
315            "UpdateDaemon" => self.update_daemon(&request),
316            "DeleteDaemon" => self.delete_daemon(&request),
317            "ListDaemons" => self.list_daemons(&request),
318            "DescribeDaemonDeployments" => self.describe_daemon_deployments(&request),
319            "ListDaemonDeployments" => self.list_daemon_deployments(&request),
320            "DescribeDaemonRevisions" => self.describe_daemon_revisions(&request),
321            "CreateExpressGatewayService" => self.create_express_gateway_service(&request),
322            "DescribeExpressGatewayService" => self.describe_express_gateway_service(&request),
323            "UpdateExpressGatewayService" => self.update_express_gateway_service(&request),
324            "DeleteExpressGatewayService" => self.delete_express_gateway_service(&request),
325            _ => Err(AwsServiceError::action_not_implemented(
326                "ecs",
327                &request.action,
328            )),
329        };
330        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
331            self.save_snapshot().await;
332        }
333        result
334    }
335
336    fn supported_actions(&self) -> &[&str] {
337        SUPPORTED_ACTIONS
338    }
339}
340
341// -------- helpers --------
342
343// -------- operations: clusters --------
344
345// -------- operations: task definitions --------
346
347// -------- operations: tagging --------
348
349// -------- operations: account settings --------
350
351// -------- operations: tasks --------
352
353// -------- operations: services --------
354
355// -------- operations: container instances, attributes, capacity providers, task sets, task protection, ExecuteCommand, agent surface --------
356
357#[path = "service_clusters.rs"]
358mod service_clusters;
359
360#[path = "service_task_definitions.rs"]
361mod service_task_definitions;
362
363#[path = "service_tagging.rs"]
364mod service_tagging;
365
366#[path = "service_account_settings.rs"]
367mod service_account_settings;
368
369#[path = "service_tasks.rs"]
370mod service_tasks;
371
372#[path = "service_services_resource.rs"]
373mod service_services_resource;
374
375#[path = "service_container_instances_etc.rs"]
376mod service_container_instances_etc;
377
378#[path = "service_daemons.rs"]
379mod service_daemons;
380
381#[path = "service_express_gateway.rs"]
382mod service_express_gateway;
383
384#[path = "helpers.rs"]
385mod helpers;
386pub(crate) use helpers::*;
387
388#[cfg(test)]
389#[path = "service_tests.rs"]
390mod tests;