1use std::sync::Arc;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Map, Value};
7use tokio::sync::Mutex as AsyncMutex;
8
9use fakecloud_aws::arn::Arn;
10use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
11use fakecloud_persistence::SnapshotStore;
12
13use crate::state::{
14 Attribute, AttributeRef, AwsLogsConfig, CapacityProvider, CircuitBreakerConfig, Cluster,
15 Container, ContainerInstance, Deployment, EcsSnapshot, EcsState, Service, SharedEcsState,
16 TagEntry, Task, TaskDefinition, TaskSet, ECS_SNAPSHOT_SCHEMA_VERSION,
17};
18
19const SUPPORTED_ACTIONS: &[&str] = &[
20 "CreateCluster",
21 "DescribeClusters",
22 "DeleteCluster",
23 "ListClusters",
24 "UpdateCluster",
25 "UpdateClusterSettings",
26 "PutClusterCapacityProviders",
27 "RegisterTaskDefinition",
28 "DescribeTaskDefinition",
29 "DeregisterTaskDefinition",
30 "DeleteTaskDefinitions",
31 "ListTaskDefinitions",
32 "ListTaskDefinitionFamilies",
33 "TagResource",
34 "UntagResource",
35 "ListTagsForResource",
36 "PutAccountSetting",
37 "PutAccountSettingDefault",
38 "DeleteAccountSetting",
39 "ListAccountSettings",
40 "RunTask",
41 "StartTask",
42 "StopTask",
43 "DescribeTasks",
44 "ListTasks",
45 "CreateService",
46 "UpdateService",
47 "DeleteService",
48 "DescribeServices",
49 "ListServices",
50 "ListServicesByNamespace",
51 "RegisterContainerInstance",
52 "DeregisterContainerInstance",
53 "DescribeContainerInstances",
54 "ListContainerInstances",
55 "UpdateContainerAgent",
56 "UpdateContainerInstancesState",
57 "PutAttributes",
58 "DeleteAttributes",
59 "ListAttributes",
60 "CreateCapacityProvider",
61 "DeleteCapacityProvider",
62 "DescribeCapacityProviders",
63 "UpdateCapacityProvider",
64 "GetTaskProtection",
65 "UpdateTaskProtection",
66 "CreateTaskSet",
67 "UpdateTaskSet",
68 "DeleteTaskSet",
69 "DescribeTaskSets",
70 "UpdateServicePrimaryTaskSet",
71 "ExecuteCommand",
72 "SubmitContainerStateChange",
73 "SubmitTaskStateChange",
74 "SubmitAttachmentStateChanges",
75 "DiscoverPollEndpoint",
76 "StopServiceDeployment",
77 "ContinueServiceDeployment",
78 "ListServiceDeployments",
79 "DescribeServiceDeployments",
80 "DescribeServiceRevisions",
81 "RegisterDaemonTaskDefinition",
82 "DescribeDaemonTaskDefinition",
83 "DeleteDaemonTaskDefinition",
84 "ListDaemonTaskDefinitions",
85 "CreateDaemon",
86 "DescribeDaemon",
87 "UpdateDaemon",
88 "DeleteDaemon",
89 "ListDaemons",
90 "DescribeDaemonDeployments",
91 "ListDaemonDeployments",
92 "DescribeDaemonRevisions",
93 "CreateExpressGatewayService",
94 "DescribeExpressGatewayService",
95 "UpdateExpressGatewayService",
96 "DeleteExpressGatewayService",
97];
98
99pub struct EcsService {
100 state: SharedEcsState,
101 snapshot_store: Option<Arc<dyn SnapshotStore>>,
102 snapshot_lock: Arc<AsyncMutex<()>>,
103 runtime: Option<Arc<crate::runtime::EcsRuntime>>,
104 role_trust_validator: Option<Arc<dyn fakecloud_core::auth::RoleTrustValidator>>,
105}
106
107impl EcsService {
108 pub fn new(state: SharedEcsState) -> Self {
109 Self {
110 state,
111 snapshot_store: None,
112 snapshot_lock: Arc::new(AsyncMutex::new(())),
113 runtime: None,
114 role_trust_validator: None,
115 }
116 }
117
118 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119 self.snapshot_store = Some(store);
120 self
121 }
122
123 pub fn with_runtime(mut self, runtime: Arc<crate::runtime::EcsRuntime>) -> Self {
124 self.runtime = Some(runtime);
125 self
126 }
127
128 pub fn with_role_trust_validator(
129 mut self,
130 validator: Arc<dyn fakecloud_core::auth::RoleTrustValidator>,
131 ) -> Self {
132 self.role_trust_validator = Some(validator);
133 self
134 }
135
136 fn check_pass_role(&self, account_id: &str, role_arn: &str) -> Result<(), AwsServiceError> {
137 let Some(ref validator) = self.role_trust_validator else {
138 return Ok(());
139 };
140 if let Err(err) = validator.validate(account_id, role_arn, "ecs-tasks.amazonaws.com") {
141 return Err(AwsServiceError::aws_error(
142 StatusCode::BAD_REQUEST,
143 "InvalidParameterException",
144 err.to_string(),
145 ));
146 }
147 Ok(())
148 }
149
150 pub fn state_handle(&self) -> &SharedEcsState {
151 &self.state
152 }
153
154 async fn save_snapshot(&self) {
155 save_ecs_snapshot(
156 &self.state,
157 self.snapshot_store.clone(),
158 &self.snapshot_lock,
159 )
160 .await;
161 }
162
163 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
167 let store = self.snapshot_store.clone()?;
168 let state = self.state.clone();
169 let lock = self.snapshot_lock.clone();
170 Some(Arc::new(move || {
171 let state = state.clone();
172 let store = store.clone();
173 let lock = lock.clone();
174 Box::pin(async move {
175 save_ecs_snapshot(&state, Some(store), &lock).await;
176 })
177 }))
178 }
179
180 pub async fn reconcile_persisted_tasks(&self) {
193 let touched = {
194 let mut accounts = self.state.write();
195 let mut touched_tasks = 0usize;
196 let mut touched_services = 0usize;
197 let now = chrono::Utc::now();
198 for (_, state) in accounts.iter_mut() {
199 for task in state.tasks.values_mut() {
200 if task.last_status != "STOPPED" {
201 task.last_status = "STOPPED".to_string();
202 task.desired_status = "STOPPED".to_string();
203 task.stop_code = Some("EssentialContainerExited".to_string());
204 task.stopped_reason =
205 Some("fakecloud restart - backing container lost".to_string());
206 if task.stopping_at.is_none() {
207 task.stopping_at = Some(now);
208 }
209 if task.stopped_at.is_none() {
210 task.stopped_at = Some(now);
211 }
212 for container in task.containers.iter_mut() {
213 container.last_status = "STOPPED".to_string();
214 }
215 touched_tasks += 1;
216 }
217 }
218 for service in state.services.values_mut() {
219 if service.running_count != 0 || service.pending_count != 0 {
220 service.running_count = 0;
221 service.pending_count = 0;
222 for deployment in service.deployments.iter_mut() {
223 deployment.running_count = 0;
224 deployment.pending_count = 0;
225 }
226 touched_services += 1;
227 }
228 }
229 }
230 (touched_tasks, touched_services)
231 };
232 if touched.0 + touched.1 > 0 {
233 tracing::info!(
234 tasks = touched.0,
235 services = touched.1,
236 "reconciled persisted ecs tasks / service counts after restart",
237 );
238 self.save_snapshot().await;
239 }
240 }
241
242 pub async fn reconcile_service_desired_counts(&self) {
256 let mut to_launch: Vec<(String, String)> = Vec::new(); {
262 let mut accounts = self.state.write();
263 for (account_id, state) in accounts.iter_mut() {
264 let scalable: Vec<Service> = state
267 .services
268 .values()
269 .filter(|s| {
270 s.status == "ACTIVE"
271 && s.scheduling_strategy != "DAEMON"
272 && s.desired_count > 0
273 })
274 .cloned()
275 .collect();
276 for service in scalable {
277 let service_tag = format!("ecs-svc/{}", service.service_name);
278 let mut active = 0i32;
279 for t in state.tasks.values() {
280 if t.started_by.as_deref() == Some(service_tag.as_str())
281 && t.cluster_name == service.cluster_name
282 && matches!(
283 t.last_status.as_str(),
284 "RUNNING" | "PENDING" | "PROVISIONING"
285 )
286 {
287 active += 1;
288 }
289 }
290 let shortfall = service.desired_count - active;
291 if shortfall <= 0 {
292 continue;
293 }
294 let launch_type = if service.launch_type.is_empty() {
295 "FARGATE"
296 } else {
297 &service.launch_type
298 };
299 let ids = spawn_service_tasks(
300 state,
301 &service,
302 shortfall,
303 service.role_arn.as_deref().unwrap_or(""),
304 launch_type,
305 None,
306 );
307 if ids.is_empty() {
308 continue;
309 }
310 tracing::info!(
311 service = %service.service_name,
312 cluster = %service.cluster_name,
313 launched = ids.len(),
314 desired = service.desired_count,
315 "ecs scheduler: launching tasks to converge to desiredCount",
316 );
317 let svc_key = crate::state::EcsState::service_key(
323 &service.cluster_name,
324 &service.service_name,
325 );
326 if let Some(svc) = state.services.get_mut(&svc_key) {
327 svc.pending_count = svc.pending_count.saturating_add(ids.len() as i32);
328 for d in svc.deployments.iter_mut() {
329 if d.status == "PRIMARY" {
330 d.pending_count = d.pending_count.saturating_add(ids.len() as i32);
331 }
332 }
333 }
334 for id in ids {
335 to_launch.push((account_id.to_string(), id));
336 }
337 }
338 }
339 }
340
341 if to_launch.is_empty() {
342 return;
343 }
344
345 if let Some(rt) = &self.runtime {
349 for (account_id, task_id) in &to_launch {
350 rt.clone()
351 .run_task(self.state.clone(), task_id.clone(), account_id.clone());
352 }
353 } else {
354 let mut accounts = self.state.write();
355 for (account_id, task_id) in &to_launch {
356 if let Some(state) = accounts.get_mut(account_id) {
357 if let Some(t) = state.tasks.get_mut(task_id) {
358 t.last_status = "STOPPED".into();
359 t.desired_status = "STOPPED".into();
360 t.stop_code = Some("TaskFailedToStart".into());
361 t.stopped_reason = Some(
362 "No container runtime available (docker/podman not installed)".into(),
363 );
364 t.stopped_at = Some(chrono::Utc::now());
365 for c in t.containers.iter_mut() {
366 c.last_status = "STOPPED".into();
367 }
368 }
369 }
370 }
371 }
372
373 self.save_snapshot().await;
374 }
375}
376
377pub async fn save_ecs_snapshot(
378 state: &SharedEcsState,
379 store: Option<Arc<dyn SnapshotStore>>,
380 lock: &AsyncMutex<()>,
381) {
382 let Some(store) = store else {
383 return;
384 };
385 let _guard = lock.lock().await;
386 let snapshot = EcsSnapshot {
387 schema_version: ECS_SNAPSHOT_SCHEMA_VERSION,
388 accounts: Some(state.read().clone()),
389 };
390 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
391 let bytes = serde_json::to_vec(&snapshot)
392 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
393 store.save(&bytes)
394 })
395 .await;
396 match join {
397 Ok(Ok(())) => {}
398 Ok(Err(err)) => tracing::error!(%err, "failed to write ecs snapshot"),
399 Err(err) => tracing::error!(%err, "ecs snapshot task panicked"),
400 }
401}
402
403pub async fn run_scheduler_ticker(service: Arc<EcsService>, interval: std::time::Duration) {
407 let mut ticker = tokio::time::interval(interval);
408 ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
409 ticker.tick().await;
411 loop {
412 ticker.tick().await;
413 service.reconcile_service_desired_counts().await;
414 }
415}
416
417#[async_trait]
418impl AwsService for EcsService {
419 fn service_name(&self) -> &str {
420 "ecs"
421 }
422
423 async fn handle(&self, request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
424 let mutates = is_mutating(request.action.as_str());
425 let result = match request.action.as_str() {
426 "CreateCluster" => self.create_cluster(&request),
427 "DescribeClusters" => self.describe_clusters(&request),
428 "DeleteCluster" => self.delete_cluster(&request),
429 "ListClusters" => self.list_clusters(&request),
430 "UpdateCluster" => self.update_cluster(&request),
431 "UpdateClusterSettings" => self.update_cluster_settings(&request),
432 "PutClusterCapacityProviders" => self.put_cluster_capacity_providers(&request),
433 "RegisterTaskDefinition" => self.register_task_definition(&request),
434 "DescribeTaskDefinition" => self.describe_task_definition(&request),
435 "DeregisterTaskDefinition" => self.deregister_task_definition(&request),
436 "DeleteTaskDefinitions" => self.delete_task_definitions(&request),
437 "ListTaskDefinitions" => self.list_task_definitions(&request),
438 "ListTaskDefinitionFamilies" => self.list_task_definition_families(&request),
439 "TagResource" => self.tag_resource(&request),
440 "UntagResource" => self.untag_resource(&request),
441 "ListTagsForResource" => self.list_tags_for_resource(&request),
442 "PutAccountSetting" => self.put_account_setting(&request),
443 "PutAccountSettingDefault" => self.put_account_setting_default(&request),
444 "DeleteAccountSetting" => self.delete_account_setting(&request),
445 "ListAccountSettings" => self.list_account_settings(&request),
446 "RunTask" => self.run_task(&request),
447 "StartTask" => self.start_task(&request),
448 "StopTask" => self.stop_task(&request).await,
449 "DescribeTasks" => self.describe_tasks(&request),
450 "ListTasks" => self.list_tasks(&request),
451 "CreateService" => self.create_service(&request),
452 "UpdateService" => self.update_service(&request),
453 "DeleteService" => self.delete_service(&request).await,
454 "DescribeServices" => self.describe_services(&request),
455 "ListServices" => self.list_services(&request),
456 "ListServicesByNamespace" => self.list_services_by_namespace(&request),
457 "RegisterContainerInstance" => self.register_container_instance(&request),
458 "DeregisterContainerInstance" => self.deregister_container_instance(&request),
459 "DescribeContainerInstances" => self.describe_container_instances(&request),
460 "ListContainerInstances" => self.list_container_instances(&request),
461 "UpdateContainerAgent" => self.update_container_agent(&request),
462 "UpdateContainerInstancesState" => self.update_container_instances_state(&request),
463 "PutAttributes" => self.put_attributes(&request),
464 "DeleteAttributes" => self.delete_attributes(&request),
465 "ListAttributes" => self.list_attributes(&request),
466 "CreateCapacityProvider" => self.create_capacity_provider(&request),
467 "DeleteCapacityProvider" => self.delete_capacity_provider(&request),
468 "DescribeCapacityProviders" => self.describe_capacity_providers(&request),
469 "UpdateCapacityProvider" => self.update_capacity_provider(&request),
470 "GetTaskProtection" => self.get_task_protection(&request),
471 "UpdateTaskProtection" => self.update_task_protection(&request),
472 "CreateTaskSet" => self.create_task_set(&request),
473 "UpdateTaskSet" => self.update_task_set(&request),
474 "DeleteTaskSet" => self.delete_task_set(&request),
475 "DescribeTaskSets" => self.describe_task_sets(&request),
476 "UpdateServicePrimaryTaskSet" => self.update_service_primary_task_set(&request),
477 "ExecuteCommand" => self.execute_command(&request).await,
478 "SubmitContainerStateChange" => self.submit_container_state_change(&request),
479 "SubmitTaskStateChange" => self.submit_task_state_change(&request),
480 "SubmitAttachmentStateChanges" => self.submit_attachment_state_changes(&request),
481 "DiscoverPollEndpoint" => self.discover_poll_endpoint(&request),
482 "StopServiceDeployment" => self.stop_service_deployment(&request),
483 "ContinueServiceDeployment" => self.continue_service_deployment(&request),
484 "ListServiceDeployments" => self.list_service_deployments(&request),
485 "DescribeServiceDeployments" => self.describe_service_deployments(&request),
486 "DescribeServiceRevisions" => self.describe_service_revisions(&request),
487 "RegisterDaemonTaskDefinition" => self.register_daemon_task_definition(&request),
488 "DescribeDaemonTaskDefinition" => self.describe_daemon_task_definition(&request),
489 "DeleteDaemonTaskDefinition" => self.delete_daemon_task_definition(&request),
490 "ListDaemonTaskDefinitions" => self.list_daemon_task_definitions(&request),
491 "CreateDaemon" => self.create_daemon(&request),
492 "DescribeDaemon" => self.describe_daemon(&request),
493 "UpdateDaemon" => self.update_daemon(&request),
494 "DeleteDaemon" => self.delete_daemon(&request),
495 "ListDaemons" => self.list_daemons(&request),
496 "DescribeDaemonDeployments" => self.describe_daemon_deployments(&request),
497 "ListDaemonDeployments" => self.list_daemon_deployments(&request),
498 "DescribeDaemonRevisions" => self.describe_daemon_revisions(&request),
499 "CreateExpressGatewayService" => self.create_express_gateway_service(&request),
500 "DescribeExpressGatewayService" => self.describe_express_gateway_service(&request),
501 "UpdateExpressGatewayService" => self.update_express_gateway_service(&request),
502 "DeleteExpressGatewayService" => self.delete_express_gateway_service(&request),
503 _ => Err(AwsServiceError::action_not_implemented(
504 "ecs",
505 &request.action,
506 )),
507 };
508 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
509 self.save_snapshot().await;
510 }
511 result
512 }
513
514 fn supported_actions(&self) -> &[&str] {
515 SUPPORTED_ACTIONS
516 }
517}
518
519#[path = "service_clusters.rs"]
536mod service_clusters;
537
538#[path = "service_task_definitions.rs"]
539mod service_task_definitions;
540
541#[path = "service_tagging.rs"]
542mod service_tagging;
543
544#[path = "service_account_settings.rs"]
545mod service_account_settings;
546
547#[path = "service_tasks.rs"]
548mod service_tasks;
549
550#[path = "service_services_resource.rs"]
551mod service_services_resource;
552
553#[path = "service_container_instances_etc.rs"]
554mod service_container_instances_etc;
555
556#[path = "service_daemons.rs"]
557mod service_daemons;
558
559#[path = "service_express_gateway.rs"]
560mod service_express_gateway;
561
562#[path = "helpers.rs"]
563mod helpers;
564pub(crate) use helpers::*;
565
566#[cfg(test)]
567#[path = "service_tests.rs"]
568mod tests;