Skip to main content

fakecloud_ecs/
service.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14    Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15    Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16    TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20    "CreateCluster",
21    "DescribeClusters",
22    "DeleteCluster",
23    "ListClusters",
24    "UpdateCluster",
25    "UpdateClusterSettings",
26    "PutClusterCapacityProviders",
27    "RegisterTaskDefinition",
28    "DescribeTaskDefinition",
29    "DeregisterTaskDefinition",
30    "DeleteTaskDefinitions",
31    "ListTaskDefinitions",
32    "ListTaskDefinitionFamilies",
33    "TagResource",
34    "UntagResource",
35    "ListTagsForResource",
36    "PutAccountSetting",
37    "PutAccountSettingDefault",
38    "DeleteAccountSetting",
39    "ListAccountSettings",
40    "RunTask",
41    "StartTask",
42    "StopTask",
43    "DescribeTasks",
44    "ListTasks",
45    "CreateService",
46    "UpdateService",
47    "DeleteService",
48    "DescribeServices",
49    "ListServices",
50    "ListServicesByNamespace",
51    "RegisterContainerInstance",
52    "DeregisterContainerInstance",
53    "DescribeContainerInstances",
54    "ListContainerInstances",
55    "UpdateContainerAgent",
56    "UpdateContainerInstancesState",
57    "PutAttributes",
58    "DeleteAttributes",
59    "ListAttributes",
60    "CreateCapacityProvider",
61    "DeleteCapacityProvider",
62    "DescribeCapacityProviders",
63    "UpdateCapacityProvider",
64    "GetTaskProtection",
65    "UpdateTaskProtection",
66    "CreateTaskSet",
67    "UpdateTaskSet",
68    "DeleteTaskSet",
69    "DescribeTaskSets",
70    "UpdateServicePrimaryTaskSet",
71    "ExecuteCommand",
72    "SubmitContainerStateChange",
73    "SubmitTaskStateChange",
74    "SubmitAttachmentStateChanges",
75    "DiscoverPollEndpoint",
76    "StopServiceDeployment",
77    "ContinueServiceDeployment",
78    "ListServiceDeployments",
79    "DescribeServiceDeployments",
80    "DescribeServiceRevisions",
81    "RegisterDaemonTaskDefinition",
82    "DescribeDaemonTaskDefinition",
83    "DeleteDaemonTaskDefinition",
84    "ListDaemonTaskDefinitions",
85    "CreateDaemon",
86    "DescribeDaemon",
87    "UpdateDaemon",
88    "DeleteDaemon",
89    "ListDaemons",
90    "DescribeDaemonDeployments",
91    "ListDaemonDeployments",
92    "DescribeDaemonRevisions",
93    "CreateExpressGatewayService",
94    "DescribeExpressGatewayService",
95    "UpdateExpressGatewayService",
96    "DeleteExpressGatewayService",
97];
98
99pub struct EcsService {
100    state: SharedEcsState,
101    snapshot_store: Option<Arc<dyn SnapshotStore>>,
102    snapshot_lock: Arc<AsyncMutex<()>>,
103    runtime: Option<Arc<crate::runtime::EcsRuntime>>,
104    role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
105}
106
107impl EcsService {
108    pub fn new(state: SharedEcsState) -> Self {
109        Self {
110            state,
111            snapshot_store: None,
112            snapshot_lock: Arc::new(AsyncMutex::new(())),
113            runtime: None,
114            role_trust_validator: None,
115        }
116    }
117
118    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119        self.snapshot_store = Some(store);
120        self
121    }
122
123    pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
124        self.runtime = Some(runtime);
125        self
126    }
127
128    pub fn with_role_trust_validator(
129        mut self,
130        validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
131    ) -> Self {
132        self.role_trust_validator = Some(validator);
133        self
134    }
135
136    fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
137        let Some(ref validator) = self.role_trust_validator else {
138            return Ok(());
139        };
140        if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
141            return Err(AwsServiceError::aws_error(
142                StatusCode::BAD_REQUEST,
143                "InvalidParameterException",
144                err.to_string(),
145            ));
146        }
147        Ok(())
148    }
149
150    pub fn state_handle(&self) -> &SharedEcsState {
151        &self.state
152    }
153
154    async fn save_snapshot(&self) {
155        save_ecs_snapshot(
156            &self.state,
157            self.snapshot_store.clone(),
158            &self.snapshot_lock,
159        )
160        .await;
161    }
162
163    /// Build a hook that persists the current state when invoked, or `None` in
164    /// memory mode. The CloudFormation provisioner mutates `state` directly and
165    /// uses this to write a CFN-provisioned resource through to disk.
166    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
167        let store = self.snapshot_store.clone()?;
168        let state = self.state.clone();
169        let lock = self.snapshot_lock.clone();
170        Some(Arc::new(move || {
171            let state = state.clone();
172            let store = store.clone();
173            let lock = lock.clone();
174            Box::pin(async move {
175                save_ecs_snapshot(&state, Some(store), &lock).await;
176            })
177        }))
178    }
179
180    /// Reconcile persisted task state with reality after a fakecloud
181    /// restart. Same bug class as RDS #1338: tasks were persisted with
182    /// `lastStatus = RUNNING` but their docker containers are gone, so
183    /// DescribeTasks reported phantom-running tasks and `docker exec`
184    /// (for ECS Exec) would fail.
185    ///
186    /// Mark every non-STOPPED task as STOPPED with a `stoppedReason`
187    /// that explains the restart, and reset service `runningCount` /
188    /// `pendingCount` to zero. The service scheduler ticker then
189    /// reconciles desiredCount and launches fresh tasks. Standalone
190    /// `RunTask` tasks aren't auto-respawned because we don't have
191    /// enough context to replay them safely; callers re-invoke.
192    pub async fn reconcile_persisted_tasks(&self) {
193        let touched = {
194            let mut accounts = self.state.write();
195            let mut touched_tasks = 0usize;
196            let mut touched_services = 0usize;
197            let now = chrono::Utc::now();
198            for (_, state) in accounts.iter_mut() {
199                for task in state.tasks.values_mut() {
200                    if task.last_status != "STOPPED" {
201                        task.last_status = "STOPPED".to_string();
202                        task.desired_status = "STOPPED".to_string();
203                        task.stop_code = Some("EssentialContainerExited".to_string());
204                        task.stopped_reason =
205                            Some("fakecloud restart - backing container lost".to_string());
206                        if task.stopping_at.is_none() {
207                            task.stopping_at = Some(now);
208                        }
209                        if task.stopped_at.is_none() {
210                            task.stopped_at = Some(now);
211                        }
212                        for container in task.containers.iter_mut() {
213                            container.last_status = "STOPPED".to_string();
214                        }
215                        touched_tasks += 1;
216                    }
217                }
218                for service in state.services.values_mut() {
219                    if service.running_count != 0 || service.pending_count != 0 {
220                        service.running_count = 0;
221                        service.pending_count = 0;
222                        for deployment in service.deployments.iter_mut() {
223                            deployment.running_count = 0;
224                            deployment.pending_count = 0;
225                        }
226                        touched_services += 1;
227                    }
228                }
229            }
230            (touched_tasks, touched_services)
231        };
232        if touched.0 + touched.1 > 0 {
233            tracing::info!(
234                tasks = touched.0,
235                services = touched.1,
236                "reconciled persisted ecs tasks / service counts after restart",
237            );
238            self.save_snapshot().await;
239        }
240    }
241
242    /// Converge every ACTIVE service toward its `desiredCount`, launching
243    /// replacement tasks for any shortfall. Real ECS maintains `desiredCount`
244    /// autonomously; `reconcile_persisted_tasks` STOPs all tasks and zeroes
245    /// counts on restart "trusting a scheduler ticker," but no such ticker
246    /// existed, so services with `desiredCount=N` stayed at `runningCount=0`
247    /// forever. This is that ticker. It also re-launches tasks lost to crashed
248    /// containers during normal operation. bug-audit 2026-06-15, 4.7.
249    ///
250    /// One pass: for each service, count its non-STOPPED tasks (matched by the
251    /// `ecs-svc/<name>` `started_by` tag, same as `recompute_service_counts`);
252    /// if that count is below `desiredCount`, spawn the difference via the
253    /// shared `spawn_service_tasks` path and hand the new task IDs to the
254    /// runtime. Persist only when something was launched.
255    pub async fn reconcile_service_desired_counts(&self) {
256        // Phase 1 (write lock): find shortfalls, spawn PENDING task rows,
257        // and refresh each service's running/pending counts so DescribeServices
258        // is accurate immediately. Collect the new task IDs to launch after the
259        // lock is released.
260        let mut to_launch: Vec<(String, String)> = Vec::new(); // (account_id, task_id)
261        {
262            let mut accounts = self.state.write();
263            for (account_id, state) in accounts.iter_mut() {
264                // Snapshot the services we may need to scale so we don't hold
265                // an aliasing borrow of `state` while mutating it.
266                let scalable: Vec<Service> = state
267                    .services
268                    .values()
269                    .filter(|s| {
270                        s.status == "ACTIVE"
271                            && s.scheduling_strategy != "DAEMON"
272                            && s.desired_count > 0
273                    })
274                    .cloned()
275                    .collect();
276                for service in scalable {
277                    let service_tag = format!("ecs-svc/{}", service.service_name);
278                    let mut active = 0i32;
279                    for t in state.tasks.values() {
280                        if t.started_by.as_deref() == Some(service_tag.as_str())
281                            && t.cluster_name == service.cluster_name
282                            && matches!(
283                                t.last_status.as_str(),
284                                "RUNNING" | "PENDING" | "PROVISIONING"
285                            )
286                        {
287                            active += 1;
288                        }
289                    }
290                    let shortfall = service.desired_count - active;
291                    if shortfall <= 0 {
292                        continue;
293                    }
294                    let launch_type = if service.launch_type.is_empty() {
295                        "FARGATE"
296                    } else {
297                        &service.launch_type
298                    };
299                    let ids = spawn_service_tasks(
300                        state,
301                        &service,
302                        shortfall,
303                        service.role_arn.as_deref().unwrap_or(""),
304                        launch_type,
305                        None,
306                    );
307                    if ids.is_empty() {
308                        continue;
309                    }
310                    tracing::info!(
311                        service = %service.service_name,
312                        cluster = %service.cluster_name,
313                        launched = ids.len(),
314                        desired = service.desired_count,
315                        "ecs scheduler: launching tasks to converge to desiredCount",
316                    );
317                    // Reflect the new PENDING tasks in the stored counts.
318                    // Services are keyed by "cluster/name", not the bare name —
319                    // looking up by service_name returned None, so these counts
320                    // were never updated (stale snapshot until DescribeServices
321                    // recomputed them).
322                    let svc_key = crate::state::EcsState::service_key(
323                        &service.cluster_name,
324                        &service.service_name,
325                    );
326                    if let Some(svc) = state.services.get_mut(&svc_key) {
327                        svc.pending_count = svc.pending_count.saturating_add(ids.len() as i32);
328                        for d in svc.deployments.iter_mut() {
329                            if d.status == "PRIMARY" {
330                                d.pending_count = d.pending_count.saturating_add(ids.len() as i32);
331                            }
332                        }
333                    }
334                    for id in ids {
335                        to_launch.push((account_id.to_string(), id));
336                    }
337                }
338            }
339        }
340
341        if to_launch.is_empty() {
342            return;
343        }
344
345        // Phase 2 (no lock): hand new tasks to the runtime, which advances them
346        // PENDING -> RUNNING in the background. Without a runtime (docker/podman
347        // absent) mark them STOPPED so they don't linger PENDING forever.
348        if let Some(rt) = &self.runtime {
349            for (account_id, task_id) in &to_launch {
350                rt.clone()
351                    .run_task(self.state.clone(), task_id.clone(), account_id.clone());
352            }
353        } else {
354            let mut accounts = self.state.write();
355            for (account_id, task_id) in &to_launch {
356                if let Some(state) = accounts.get_mut(account_id) {
357                    if let Some(t) = state.tasks.get_mut(task_id) {
358                        t.last_status = "STOPPED".into();
359                        t.desired_status = "STOPPED".into();
360                        t.stop_code = Some("TaskFailedToStart".into());
361                        t.stopped_reason = Some(
362                            "No container runtime available (docker/podman not installed)".into(),
363                        );
364                        t.stopped_at = Some(chrono::Utc::now());
365                        for c in t.containers.iter_mut() {
366                            c.last_status = "STOPPED".into();
367                        }
368                    }
369                }
370            }
371        }
372
373        self.save_snapshot().await;
374    }
375}
376
377pub async fn save_ecs_snapshot(
378    state: &SharedEcsState,
379    store: Option<Arc<dyn SnapshotStore>>,
380    lock: &AsyncMutex<()>,
381) {
382    let Some(store) = store else {
383        return;
384    };
385    let _guard = lock.lock().await;
386    let snapshot = EcsSnapshot {
387        schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
388        accounts: Some(state.read().clone()),
389    };
390    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
391        let bytes = serde_json::to_vec(&snapshot)
392            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
393        store.save(&bytes)
394    })
395    .await;
396    match join {
397        Ok(Ok(())) => {}
398        Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
399        Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
400    }
401}
402
403/// Background scheduler ticker: periodically converges ECS services toward
404/// `desiredCount`. Wired in `fakecloud-server` like the other tickers. Real
405/// ECS does this continuously; we tick every few seconds. bug-audit 4.7.
406pub async fn run_scheduler_ticker(service: Arc<EcsService>, interval: std::time::Duration) {
407    let mut ticker = tokio::time::interval(interval);
408    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409    // Skip the immediate first tick; restart reconciliation runs separately.
410    ticker.tick().await;
411    loop {
412        ticker.tick().await;
413        service.reconcile_service_desired_counts().await;
414    }
415}
416
417#[async_trait]
418impl AwsService for EcsService {
419    fn service_name(&self) -> &str {
420        "ecs"
421    }
422
423    async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
424        let mutates = is_mutating(request.action.as_str());
425        let result = match request.action.as_str() {
426            "CreateCluster" => self.create_cluster(&request),
427            "DescribeClusters" => self.describe_clusters(&request),
428            "DeleteCluster" => self.delete_cluster(&request),
429            "ListClusters" => self.list_clusters(&request),
430            "UpdateCluster" => self.update_cluster(&request),
431            "UpdateClusterSettings" => self.update_cluster_settings(&request),
432            "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
433            "RegisterTaskDefinition" => self.register_task_definition(&request),
434            "DescribeTaskDefinition" => self.describe_task_definition(&request),
435            "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
436            "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
437            "ListTaskDefinitions" => self.list_task_definitions(&request),
438            "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
439            "TagResource" => self.tag_resource(&request),
440            "UntagResource" => self.untag_resource(&request),
441            "ListTagsForResource" => self.list_tags_for_resource(&request),
442            "PutAccountSetting" => self.put_account_setting(&request),
443            "PutAccountSettingDefault" => self.put_account_setting_default(&request),
444            "DeleteAccountSetting" => self.delete_account_setting(&request),
445            "ListAccountSettings" => self.list_account_settings(&request),
446            "RunTask" => self.run_task(&request),
447            "StartTask" => self.start_task(&request),
448            "StopTask" => self.stop_task(&request).await,
449            "DescribeTasks" => self.describe_tasks(&request),
450            "ListTasks" => self.list_tasks(&request),
451            "CreateService" => self.create_service(&request),
452            "UpdateService" => self.update_service(&request),
453            "DeleteService" => self.delete_service(&request).await,
454            "DescribeServices" => self.describe_services(&request),
455            "ListServices" => self.list_services(&request),
456            "ListServicesByNamespace" => self.list_services_by_namespace(&request),
457            "RegisterContainerInstance" => self.register_container_instance(&request),
458            "DeregisterContainerInstance" => self.deregister_container_instance(&request),
459            "DescribeContainerInstances" => self.describe_container_instances(&request),
460            "ListContainerInstances" => self.list_container_instances(&request),
461            "UpdateContainerAgent" => self.update_container_agent(&request),
462            "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
463            "PutAttributes" => self.put_attributes(&request),
464            "DeleteAttributes" => self.delete_attributes(&request),
465            "ListAttributes" => self.list_attributes(&request),
466            "CreateCapacityProvider" => self.create_capacity_provider(&request),
467            "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
468            "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
469            "UpdateCapacityProvider" => self.update_capacity_provider(&request),
470            "GetTaskProtection" => self.get_task_protection(&request),
471            "UpdateTaskProtection" => self.update_task_protection(&request),
472            "CreateTaskSet" => self.create_task_set(&request),
473            "UpdateTaskSet" => self.update_task_set(&request),
474            "DeleteTaskSet" => self.delete_task_set(&request),
475            "DescribeTaskSets" => self.describe_task_sets(&request),
476            "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
477            "ExecuteCommand" => self.execute_command(&request).await,
478            "SubmitContainerStateChange" => self.submit_container_state_change(&request),
479            "SubmitTaskStateChange" => self.submit_task_state_change(&request),
480            "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
481            "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
482            "StopServiceDeployment" => self.stop_service_deployment(&request),
483            "ContinueServiceDeployment" => self.continue_service_deployment(&request),
484            "ListServiceDeployments" => self.list_service_deployments(&request),
485            "DescribeServiceDeployments" => self.describe_service_deployments(&request),
486            "DescribeServiceRevisions" => self.describe_service_revisions(&request),
487            "RegisterDaemonTaskDefinition" => self.register_daemon_task_definition(&request),
488            "DescribeDaemonTaskDefinition" => self.describe_daemon_task_definition(&request),
489            "DeleteDaemonTaskDefinition" => self.delete_daemon_task_definition(&request),
490            "ListDaemonTaskDefinitions" => self.list_daemon_task_definitions(&request),
491            "CreateDaemon" => self.create_daemon(&request),
492            "DescribeDaemon" => self.describe_daemon(&request),
493            "UpdateDaemon" => self.update_daemon(&request),
494            "DeleteDaemon" => self.delete_daemon(&request),
495            "ListDaemons" => self.list_daemons(&request),
496            "DescribeDaemonDeployments" => self.describe_daemon_deployments(&request),
497            "ListDaemonDeployments" => self.list_daemon_deployments(&request),
498            "DescribeDaemonRevisions" => self.describe_daemon_revisions(&request),
499            "CreateExpressGatewayService" => self.create_express_gateway_service(&request),
500            "DescribeExpressGatewayService" => self.describe_express_gateway_service(&request),
501            "UpdateExpressGatewayService" => self.update_express_gateway_service(&request),
502            "DeleteExpressGatewayService" => self.delete_express_gateway_service(&request),
503            _ => Err(AwsServiceError::action_not_implemented(
504                "ecs",
505                &request.action,
506            )),
507        };
508        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
509            self.save_snapshot().await;
510        }
511        result
512    }
513
514    fn supported_actions(&self) -> &[&str] {
515        SUPPORTED_ACTIONS
516    }
517}
518
519// -------- helpers --------
520
521// -------- operations: clusters --------
522
523// -------- operations: task definitions --------
524
525// -------- operations: tagging --------
526
527// -------- operations: account settings --------
528
529// -------- operations: tasks --------
530
531// -------- operations: services --------
532
533// -------- operations: container instances, attributes, capacity providers, task sets, task protection, ExecuteCommand, agent surface --------
534
535#[path = "service_clusters.rs"]
536mod service_clusters;
537
538#[path = "service_task_definitions.rs"]
539mod service_task_definitions;
540
541#[path = "service_tagging.rs"]
542mod service_tagging;
543
544#[path = "service_account_settings.rs"]
545mod service_account_settings;
546
547#[path = "service_tasks.rs"]
548mod service_tasks;
549
550#[path = "service_services_resource.rs"]
551mod service_services_resource;
552
553#[path = "service_container_instances_etc.rs"]
554mod service_container_instances_etc;
555
556#[path = "service_daemons.rs"]
557mod service_daemons;
558
559#[path = "service_express_gateway.rs"]
560mod service_express_gateway;
561
562#[path = "helpers.rs"]
563mod helpers;
564pub(crate) use helpers::*;
565
566#[cfg(test)]
567#[path = "service_tests.rs"]
568mod tests;