Skip to main content

fakecloud_ecs/
service_tasks.rs

1// Auto-extracted from service.rs as part of carryover service.rs split.
2
3#![allow(clippy::too_many_arguments)]
4
5use chrono::Utc;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use super::*;
11
12impl EcsService {
13    /// Spawn a task from a cross-service caller (EventBridge Scheduler /
14    /// EventBridge Rules) without going through the AwsRequest dispatch
15    /// path. Builds the JSON body and reuses [`Self::run_task`] so all
16    /// the existing validation / runtime spawn logic runs identically.
17    /// Returns Err with a human-readable message on validation failures —
18    /// the caller decides whether to surface the failure (e.g. DLQ).
19    pub fn run_task_external(
20        &self,
21        account_id: &str,
22        cluster: &str,
23        task_definition: &str,
24        launch_type: Option<&str>,
25        count: usize,
26    ) -> Result<(), String> {
27        use bytes::Bytes;
28        use http::{HeaderMap, Method};
29        use std::collections::HashMap;
30        let body = serde_json::json!({
31            "cluster": cluster,
32            "taskDefinition": task_definition,
33            "launchType": launch_type.unwrap_or("FARGATE"),
34            "count": count.max(1) as i64,
35        });
36        let body_bytes =
37            Bytes::from(serde_json::to_vec(&body).map_err(|e| format!("encode body: {e}"))?);
38        let req = AwsRequest {
39            service: "ecs".into(),
40            action: "RunTask".into(),
41            region: "us-east-1".into(),
42            account_id: account_id.to_string(),
43            request_id: uuid::Uuid::new_v4().to_string(),
44            headers: HeaderMap::new(),
45            query_params: HashMap::new(),
46            body: body_bytes,
47            body_stream: parking_lot::Mutex::new(None),
48            path_segments: Vec::new(),
49            raw_path: "/".into(),
50            raw_query: String::new(),
51            method: Method::POST,
52            is_query_protocol: false,
53            access_key_id: None,
54            principal: None,
55        };
56        self.run_task(&req)
57            .map(|_| ())
58            .map_err(|e| format!("{e:?}"))
59    }
60
61    pub fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
62        let body = request.json_body();
63        let td_ref = req_str(&body, "taskDefinition")?;
64        let cluster_ref = opt_str(&body, "cluster");
65        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
66        let launch_type = opt_str(&body, "launchType")
67            .unwrap_or("FARGATE")
68            .to_string();
69        let placement_constraints: Vec<Value> = body
70            .get("placementConstraints")
71            .and_then(|v| v.as_array())
72            .cloned()
73            .unwrap_or_default();
74        let placement_strategy: Vec<Value> = body
75            .get("placementStrategy")
76            .and_then(|v| v.as_array())
77            .cloned()
78            .unwrap_or_default();
79        let count = body
80            .get("count")
81            .and_then(|v| v.as_i64())
82            .filter(|n| (1..=10).contains(n))
83            .unwrap_or(1) as usize;
84        let group = opt_str(&body, "group").map(String::from);
85        let started_by = opt_str(&body, "startedBy").map(String::from);
86        let enable_execute_command = body
87            .get("enableExecuteCommand")
88            .and_then(|v| v.as_bool())
89            .unwrap_or(false);
90        let propagate_tags = opt_str(&body, "propagateTags").map(String::from);
91        let _enable_ecs_managed_tags = body
92            .get("enableECSManagedTags")
93            .and_then(|v| v.as_bool())
94            .unwrap_or(false);
95        let _capacity_provider_strategy: Vec<Value> = body
96            .get("capacityProviderStrategy")
97            .and_then(|v| v.as_array())
98            .cloned()
99            .unwrap_or_default();
100        let volume_configurations: Vec<Value> = body
101            .get("volumeConfigurations")
102            .and_then(|v| v.as_array())
103            .cloned()
104            .unwrap_or_default();
105        let _availability_zone_rebalancing =
106            opt_str(&body, "availabilityZoneRebalancing").map(String::from);
107        let mut tags = parse_tags(&body);
108
109        // PassRole trust check on any role overrides supplied via the
110        // overrides.taskRoleArn / overrides.executionRoleArn fields.
111        // The base task definition was already checked at Register time,
112        // but RunTask can override either role and AWS re-validates the
113        // trust policy on every call.
114        if let Some(overrides) = body.get("overrides") {
115            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
116                self.check_pass_role(&request.account_id, role_arn)?;
117            }
118            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
119                self.check_pass_role(&request.account_id, role_arn)?;
120            }
121        }
122
123        let account = request.account_id.clone();
124        let runtime = self.runtime.clone();
125        let mut accounts = self.state.write();
126        let state = accounts.get_or_create(&account);
127        let cluster_arn = state
128            .clusters
129            .get(&cluster_name)
130            .map(|c| c.cluster_arn.clone())
131            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
132        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
133        let revisions = state
134            .task_definitions
135            .get(&family)
136            .ok_or_else(|| task_definition_not_found(td_ref))?;
137        let td = match rev {
138            Some(n) => revisions
139                .get(&n)
140                .ok_or_else(|| task_definition_not_found(td_ref))?,
141            None => latest_active_revision(revisions)
142                .ok_or_else(|| task_definition_not_found(td_ref))?,
143        };
144        if td.status != "ACTIVE" {
145            return Err(client_exception(format!(
146                "Task definition {} is not ACTIVE",
147                td.task_definition_arn
148            )));
149        }
150        let td_arn = td.task_definition_arn.clone();
151        let td_family = td.family.clone();
152        let td_revision = td.revision;
153        let td_cpu = td.cpu.clone();
154        let td_memory = td.memory.clone();
155        let td_task_role = td.task_role_arn.clone();
156        let td_exec_role = td.execution_role_arn.clone();
157        let td_containers = td.container_definitions.clone();
158        // RunTask supports propagateTags=TASK_DEFINITION to copy the
159        // TaskDefinition's tags onto each spawned task, in addition to
160        // any tags supplied directly in the request body. Real AWS
161        // unions the two sets; explicit tags win on key conflicts.
162        if propagate_tags.as_deref() == Some("TASK_DEFINITION") {
163            let mut td_tags = td.tags.clone();
164            td_tags.retain(|t| !tags.iter().any(|x| x.key == t.key));
165            tags.extend(td_tags);
166        }
167
168        let mut spawned_tasks: Vec<String> = Vec::new();
169        let mut task_jsons: Vec<Value> = Vec::new();
170        for _ in 0..count {
171            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
172            let task_arn = state.task_arn(&cluster_name, &task_id);
173            let containers: Vec<Container> = td_containers
174                .iter()
175                .map(|def| Container {
176                    container_arn: format!(
177                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
178                        state.region,
179                        state.account_id,
180                        cluster_name,
181                        task_id,
182                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
183                    ),
184                    name: def
185                        .get("name")
186                        .and_then(|v| v.as_str())
187                        .unwrap_or("app")
188                        .to_string(),
189                    image: def
190                        .get("image")
191                        .and_then(|v| v.as_str())
192                        .unwrap_or("")
193                        .to_string(),
194                    task_arn: task_arn.clone(),
195                    last_status: "PENDING".into(),
196                    exit_code: None,
197                    reason: None,
198                    runtime_id: None,
199                    essential: def
200                        .get("essential")
201                        .and_then(|v| v.as_bool())
202                        .unwrap_or(true),
203                    cpu: def
204                        .get("cpu")
205                        .and_then(|v| v.as_i64())
206                        .map(|n| n.to_string()),
207                    memory: def
208                        .get("memory")
209                        .and_then(|v| v.as_i64())
210                        .map(|n| n.to_string()),
211                    memory_reservation: def
212                        .get("memoryReservation")
213                        .and_then(|v| v.as_i64())
214                        .map(|n| n.to_string()),
215                    network_bindings: Vec::new(),
216                    network_interfaces: Vec::new(),
217                    health_status: Some("UNKNOWN".to_string()),
218                    managed_agents: None,
219                    image_digest: None,
220                })
221                .collect();
222            let awslogs = td_containers.iter().find_map(|def| {
223                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
224                let log_cfg = def.get("logConfiguration")?;
225                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
226                    return None;
227                }
228                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
229                Some(AwsLogsConfig {
230                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
231                    stream_prefix: opts
232                        .get("awslogs-stream-prefix")
233                        .and_then(|v| v.as_str())
234                        .map(String::from),
235                    region: opts
236                        .get("awslogs-region")
237                        .and_then(|v| v.as_str())
238                        .unwrap_or(&state.region)
239                        .to_string(),
240                    container_name: name,
241                })
242            });
243            let capacity_provider_name = body
244                .get("capacityProviderStrategy")
245                .and_then(|v| v.as_array())
246                .and_then(|arr| arr.first())
247                .and_then(|item| item.get("capacityProvider"))
248                .and_then(|v| v.as_str())
249                .map(String::from);
250            let mut task = Task {
251                task_arn: task_arn.clone(),
252                task_id: task_id.clone(),
253                cluster_arn: cluster_arn.clone(),
254                cluster_name: cluster_name.clone(),
255                task_definition_arn: td_arn.clone(),
256                family: td_family.clone(),
257                revision: td_revision,
258                container_instance_arn: None,
259                capacity_provider_name,
260                last_status: "PROVISIONING".into(),
261                desired_status: "RUNNING".into(),
262                launch_type: launch_type.clone(),
263                platform_version: Some("1.4.0".into()),
264                cpu: body
265                    .get("overrides")
266                    .and_then(|v| v.get("cpu"))
267                    .and_then(|v| v.as_str())
268                    .map(String::from)
269                    .or_else(|| td_cpu.clone()),
270                memory: body
271                    .get("overrides")
272                    .and_then(|v| v.get("memory"))
273                    .and_then(|v| v.as_str())
274                    .map(String::from)
275                    .or_else(|| td_memory.clone()),
276                containers,
277                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
278                started_by: started_by.clone(),
279                group: group.clone(),
280                connectivity: "CONNECTING".into(),
281                stop_code: None,
282                stopped_reason: None,
283                created_at: Utc::now(),
284                started_at: None,
285                stopping_at: None,
286                stopped_at: None,
287                pull_started_at: None,
288                pull_stopped_at: None,
289                connectivity_at: None,
290                started_by_ref_id: None,
291                execution_role_arn: td_exec_role.clone(),
292                task_role_arn: td_task_role.clone(),
293                tags: tags.clone(),
294                awslogs,
295                captured_logs: String::new(),
296                protection: None,
297                enable_execute_command,
298                attachments: Vec::new(),
299                volume_configurations: volume_configurations.clone(),
300                task_set_arn: None,
301            };
302            // Best-effort placement for EC2 / EXTERNAL launch types.
303            if launch_type != "FARGATE" {
304                if let Some(arn) = crate::placement::select_container_instance(
305                    state,
306                    &cluster_name,
307                    &placement_constraints,
308                    &placement_strategy,
309                    task.group.as_deref(),
310                    &td_arn,
311                    &launch_type,
312                ) {
313                    task.container_instance_arn = Some(arn.clone());
314                    if let Some(ci) = state
315                        .container_instances
316                        .values_mut()
317                        .find(|ci| ci.container_instance_arn == arn)
318                    {
319                        ci.pending_tasks_count += 1;
320                    }
321                }
322            }
323            state.tasks.insert(task_id.clone(), task.clone());
324            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
325                cluster.pending_tasks_count += 1;
326            }
327            // Snapshot-in-progress: transition to PENDING synchronously so
328            // callers that immediately DescribeTasks see movement. RUNNING /
329            // STOPPED come later from the background runtime task.
330            if let Some(t) = state.tasks.get_mut(&task_id) {
331                t.last_status = "PENDING".into();
332            }
333            task_jsons.push(task_to_json(&task));
334            spawned_tasks.push(task_id.clone());
335        }
336        drop(accounts);
337
338        // Launch container execution outside the state lock.
339        if let Some(rt) = runtime {
340            for id in &spawned_tasks {
341                rt.clone()
342                    .run_task(self.state.clone(), id.clone(), account.clone());
343            }
344        } else {
345            // No runtime available — fail fast so the task doesn't stay
346            // PENDING forever. We incremented pending_tasks_count above;
347            // decrement it here so the cluster counter doesn't drift and
348            // block later DeleteCluster calls.
349            let mut accounts = self.state.write();
350            if let Some(state) = accounts.get_mut(&account) {
351                let mut cluster_drains: Vec<String> = Vec::new();
352                for id in &spawned_tasks {
353                    if let Some(t) = state.tasks.get_mut(id) {
354                        t.last_status = "STOPPED".into();
355                        // desired_status stays RUNNING: the task was requested
356                        // to run but failed to start. AWS keeps desired_status
357                        // as RUNNING for failed standalone RunTask until the
358                        // caller explicitly stops it. This also ensures
359                        // list_tasks (default filter=RUNNING) finds it.
360                        t.stop_code = Some("TaskFailedToStart".into());
361                        t.stopped_reason = Some(
362                            "No container runtime available (docker/podman not installed)".into(),
363                        );
364                        t.stopped_at = Some(Utc::now());
365                        for c in t.containers.iter_mut() {
366                            c.last_status = "STOPPED".into();
367                        }
368                        cluster_drains.push(t.cluster_name.clone());
369                    }
370                }
371                for name in cluster_drains {
372                    if let Some(cluster) = state.clusters.get_mut(&name) {
373                        if cluster.pending_tasks_count > 0 {
374                            cluster.pending_tasks_count -= 1;
375                        }
376                    }
377                }
378            }
379        }
380
381        Ok(AwsResponse::ok_json(json!({
382            "tasks": task_jsons,
383            "failures": [],
384        })))
385    }
386
387    pub(super) fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
388        // StartTask targets explicit container instances. Our ECS emulator
389        // has no concept of registered container instances yet (Batch 4);
390        // fall through to the same semantics as RunTask so the API is
391        // usable while the container-instance surface is pending.
392        self.run_task(request)
393    }
394
395    pub(super) async fn stop_task(
396        &self,
397        request: &AwsRequest,
398    ) -> Result<AwsResponse, AwsServiceError> {
399        let body = request.json_body();
400        let task_ref = req_str(&body, "task")?;
401        let reason = opt_str(&body, "reason")
402            .unwrap_or("UserInitiated")
403            .to_string();
404        let cluster_ref = opt_str(&body, "cluster");
405        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
406
407        let (task_id, account, task_snapshot) = {
408            let account = request.account_id.clone();
409            let mut accounts = self.state.write();
410            let state = accounts
411                .get_mut(&account)
412                .ok_or_else(|| task_not_found(task_ref))?;
413            let task_id = resolve_task_id(state, task_ref)?;
414            let task = state
415                .tasks
416                .get_mut(&task_id)
417                .ok_or_else(|| task_not_found(task_ref))?;
418            task.desired_status = "STOPPED".into();
419            task.stopping_at = Some(Utc::now());
420            task.stopped_reason = Some(reason.clone());
421            task.stop_code = Some("UserInitiated".into());
422            (task_id, account, task.clone())
423        };
424        if let Some(rt) = &self.runtime {
425            rt.stop_task(&task_id, &reason).await;
426        }
427        let _ = account;
428        Ok(AwsResponse::ok_json(json!({
429            "task": task_to_json(&task_snapshot),
430        })))
431    }
432
433    pub(super) fn describe_tasks(
434        &self,
435        request: &AwsRequest,
436    ) -> Result<AwsResponse, AwsServiceError> {
437        let body = request.json_body();
438        let refs: Vec<String> = body
439            .get("tasks")
440            .and_then(|v| v.as_array())
441            .map(|arr| {
442                arr.iter()
443                    .filter_map(|v| v.as_str().map(String::from))
444                    .collect()
445            })
446            .unwrap_or_default();
447        let include_tags = body
448            .get("include")
449            .and_then(|v| v.as_array())
450            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
451            .unwrap_or(false);
452
453        let account = request.account_id.clone();
454        let accounts = self.state.read();
455        let Some(state) = accounts.get(&account) else {
456            return Ok(AwsResponse::ok_json(json!({
457                "tasks": [],
458                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
459            })));
460        };
461        let mut found = Vec::new();
462        let mut failures = Vec::new();
463        for input in &refs {
464            let task_id = task_id_from_ref(input);
465            match state.tasks.get(&task_id) {
466                Some(t) => {
467                    let mut v = task_to_json(t);
468                    if include_tags {
469                        v.as_object_mut()
470                            .unwrap()
471                            .insert("tags".into(), tags_json(&t.tags));
472                    }
473                    found.push(v);
474                }
475                None => {
476                    failures.push(json!({
477                        "arn": input,
478                        "reason": "MISSING",
479                    }));
480                }
481            }
482        }
483        Ok(AwsResponse::ok_json(json!({
484            "tasks": found,
485            "failures": failures,
486        })))
487    }
488
489    pub(super) fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
490        let body = request.json_body();
491        let cluster_ref = opt_str(&body, "cluster");
492        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
493        let family = opt_str(&body, "family");
494        let status_filter = opt_str(&body, "desiredStatus").or(Some("RUNNING"));
495        let started_by = opt_str(&body, "startedBy");
496        let max_results = body
497            .get("maxResults")
498            .and_then(|v| v.as_i64())
499            .filter(|n| (1..=100).contains(n))
500            .map(|n| n as usize)
501            .unwrap_or(100);
502        let next_token = opt_str(&body, "nextToken").unwrap_or("");
503
504        let account = request.account_id.clone();
505        let accounts = self.state.read();
506        let mut arns: Vec<String> = match accounts.get(&account) {
507            Some(state) => state
508                .tasks
509                .values()
510                .filter(|t| t.cluster_name == cluster_name)
511                .filter(|t| family.is_none_or(|f| t.family == f))
512                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
513                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
514                .map(|t| t.task_arn.clone())
515                .collect(),
516            None => Vec::new(),
517        };
518        arns.sort();
519        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
520        let end = (start + max_results).min(arns.len());
521        let page = arns[start..end].to_vec();
522        let mut out = json!({"taskArns": page});
523        if end < arns.len() {
524            out.as_object_mut()
525                .unwrap()
526                .insert("nextToken".into(), json!(end.to_string()));
527        }
528        Ok(AwsResponse::ok_json(out))
529    }
530}
531
532#[cfg(test)]
533mod multi_container_tests {
534    use super::*;
535    use crate::EcsService;
536    use bytes::Bytes;
537    use fakecloud_core::multi_account::MultiAccountState;
538    use http::{HeaderMap, Method};
539    use parking_lot::RwLock;
540    use std::collections::HashMap;
541    use std::sync::Arc;
542
543    fn fresh_service() -> EcsService {
544        let accounts: MultiAccountState<EcsState> =
545            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
546        let state = Arc::new(RwLock::new(accounts));
547        let svc = EcsService::new(state.clone());
548        // Pre-create the cluster so RunTask doesn't trip on a missing one.
549        let mut accounts = state.write();
550        let s = accounts.get_or_create("000000000000");
551        let arn = s.cluster_arn("default");
552        s.clusters
553            .insert("default".into(), Cluster::new("default", arn));
554        drop(accounts);
555        svc
556    }
557
558    fn make_request(action: &str, body: Value) -> AwsRequest {
559        let body_bytes = Bytes::from(serde_json::to_vec(&body).unwrap());
560        AwsRequest {
561            service: "ecs".into(),
562            action: action.into(),
563            region: "us-east-1".into(),
564            account_id: "000000000000".into(),
565            request_id: uuid::Uuid::new_v4().to_string(),
566            headers: HeaderMap::new(),
567            query_params: HashMap::new(),
568            body: body_bytes,
569            body_stream: parking_lot::Mutex::new(None),
570            path_segments: Vec::new(),
571            raw_path: "/".into(),
572            raw_query: String::new(),
573            method: Method::POST,
574            is_query_protocol: false,
575            access_key_id: None,
576            principal: None,
577        }
578    }
579
580    #[test]
581    fn register_task_def_with_two_containers_then_run_task_starts_both() {
582        let svc = fresh_service();
583        let reg = make_request(
584            "RegisterTaskDefinition",
585            json!({
586                "family": "multi",
587                "containerDefinitions": [
588                    {"name": "app", "image": "alpine"},
589                    {"name": "sidecar", "image": "alpine"}
590                ]
591            }),
592        );
593        svc.register_task_definition(&reg)
594            .expect("register should succeed");
595
596        let run = make_request(
597            "RunTask",
598            json!({
599                "cluster": "default",
600                "taskDefinition": "multi",
601            }),
602        );
603        let resp = svc.run_task(&run).expect("run_task should succeed");
604        let body: Value =
605            serde_json::from_slice(resp.body.expect_bytes()).expect("body should be valid JSON");
606        let tasks = body
607            .get("tasks")
608            .and_then(|v| v.as_array())
609            .expect("tasks array");
610        assert_eq!(tasks.len(), 1);
611        let task = &tasks[0];
612        let containers = task
613            .get("containers")
614            .and_then(|v| v.as_array())
615            .expect("containers array on task");
616        assert_eq!(containers.len(), 2, "expected both containers in task");
617        let names: Vec<&str> = containers
618            .iter()
619            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
620            .collect();
621        assert!(names.contains(&"app"));
622        assert!(names.contains(&"sidecar"));
623
624        // Per-container ARNs must be distinct so DescribeTasks can address
625        // each container independently.
626        let arns: std::collections::HashSet<&str> = containers
627            .iter()
628            .filter_map(|c| c.get("containerArn").and_then(|v| v.as_str()))
629            .collect();
630        assert_eq!(arns.len(), 2);
631    }
632
633    #[test]
634    fn register_task_def_defaults_essential_true() {
635        let svc = fresh_service();
636        let reg = make_request(
637            "RegisterTaskDefinition",
638            json!({
639                "family": "default-essential",
640                // No `essential` declared on either container.
641                "containerDefinitions": [
642                    {"name": "main", "image": "alpine"},
643                    {"name": "extra", "image": "alpine"}
644                ]
645            }),
646        );
647        svc.register_task_definition(&reg).unwrap();
648
649        let run = make_request(
650            "RunTask",
651            json!({
652                "cluster": "default",
653                "taskDefinition": "default-essential",
654            }),
655        );
656        let resp = svc.run_task(&run).unwrap();
657        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
658        let containers = body["tasks"][0]["containers"].as_array().unwrap();
659        for c in containers {
660            assert_eq!(
661                c.get("essential").and_then(|v| v.as_bool()),
662                Some(true),
663                "container {:?} should default essential=true",
664                c.get("name")
665            );
666        }
667    }
668
669    #[test]
670    fn task_to_json_emits_full_container_array() {
671        // Build a Task with two containers directly and confirm helper
672        // emits both entries in the response shape.
673        let mut task = Task {
674            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
675            task_id: "abc".into(),
676            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
677            cluster_name: "default".into(),
678            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/multi:1"
679                .into(),
680            family: "multi".into(),
681            revision: 1,
682            container_instance_arn: None,
683            capacity_provider_name: None,
684            last_status: "RUNNING".into(),
685            desired_status: "RUNNING".into(),
686            launch_type: "FARGATE".into(),
687            platform_version: None,
688            cpu: None,
689            memory: None,
690            containers: Vec::new(),
691            overrides: json!({}),
692            started_by: None,
693            group: None,
694            connectivity: "CONNECTED".into(),
695            stop_code: None,
696            stopped_reason: None,
697            created_at: chrono::Utc::now(),
698            started_at: None,
699            stopping_at: None,
700            stopped_at: None,
701            pull_started_at: None,
702            pull_stopped_at: None,
703            connectivity_at: None,
704            started_by_ref_id: None,
705            execution_role_arn: None,
706            task_role_arn: None,
707            tags: Vec::new(),
708            awslogs: None,
709            captured_logs: String::new(),
710            protection: None,
711            enable_execute_command: false,
712            attachments: Vec::new(),
713            volume_configurations: Vec::new(),
714            task_set_arn: None,
715        };
716        for name in ["app", "sidecar"] {
717            task.containers.push(Container {
718                container_arn: format!(
719                    "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
720                ),
721                name: name.into(),
722                image: "alpine".into(),
723                task_arn: task.task_arn.clone(),
724                last_status: "RUNNING".into(),
725                exit_code: None,
726                reason: None,
727                runtime_id: Some(format!("docker-{name}")),
728                essential: true,
729                cpu: None,
730                memory: None,
731                memory_reservation: None,
732                network_bindings: Vec::new(),
733                network_interfaces: Vec::new(),
734                health_status: None,
735                managed_agents: None,
736                image_digest: None,
737            });
738        }
739
740        let v = task_to_json(&task);
741        let containers = v
742            .get("containers")
743            .and_then(|v| v.as_array())
744            .expect("containers array");
745        assert_eq!(containers.len(), 2);
746        let names: Vec<&str> = containers
747            .iter()
748            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
749            .collect();
750        assert_eq!(names, vec!["app", "sidecar"]);
751        for c in containers {
752            assert!(c.get("containerArn").is_some());
753            assert!(c.get("name").is_some());
754            assert!(c.get("lastStatus").is_some());
755            assert!(c.get("runtimeId").is_some());
756            assert_eq!(c.get("essential").and_then(|v| v.as_bool()), Some(true));
757        }
758    }
759}
760
761#[cfg(test)]
762mod port_mapping_tests {
763    //! Cover the `portMappings` -> `docker run --publish` translation
764    //! plus the per-container `networkBindings` projected onto the task.
765    //! The argv builder is exercised directly so we don't need a real
766    //! container CLI on the test host.
767    use super::*;
768    use crate::runtime::{
769        build_run_argv, mark_running_multi, ContainerPlan, PortMapping, RunningContainer,
770    };
771    use crate::state::{Container, EcsState};
772    use crate::SharedEcsState;
773    use chrono::Utc;
774    use fakecloud_core::multi_account::MultiAccountState;
775    use parking_lot::RwLock;
776    use std::sync::Arc;
777
778    fn plan_with_ports(
779        port_mappings: Vec<PortMapping>,
780        network_mode: Option<&str>,
781    ) -> ContainerPlan {
782        ContainerPlan {
783            container_name: "app".into(),
784            image: "alpine:latest".into(),
785            env: Vec::new(),
786            entry_point: Vec::new(),
787            command: Vec::new(),
788            secrets_refs: Vec::new(),
789            essential: true,
790            has_task_role: false,
791            port_mappings,
792            network_mode: network_mode.map(String::from),
793            depends_on: Vec::new(),
794            health_check: None,
795            volume_mounts: Vec::new(),
796            ulimits: Vec::new(),
797            linux_parameters: None,
798            stop_timeout: None,
799            user: None,
800            working_directory: None,
801            tty: false,
802            interactive: false,
803            readonly_rootfs: false,
804        }
805    }
806
807    fn argv_string(plan: &ContainerPlan) -> Vec<String> {
808        build_run_argv(plan, &[], "task-1", "host-gateway", "alpine:latest")
809    }
810
811    /// Helper for asserting a `--publish <spec>` pair is present in argv.
812    /// Returns true when the flag/value pair appears as adjacent entries.
813    fn argv_has_publish(argv: &[String], spec: &str) -> bool {
814        argv.windows(2).any(|w| w[0] == "--publish" && w[1] == spec)
815    }
816
817    #[test]
818    fn port_mappings_translate_to_publish_flags() {
819        let plan = plan_with_ports(
820            vec![PortMapping {
821                container_port: 80,
822                host_port: 8080,
823                protocol: "tcp".into(),
824            }],
825            None,
826        );
827        let argv = argv_string(&plan);
828        assert!(
829            argv_has_publish(&argv, "80:8080/tcp"),
830            "expected --publish 80:8080/tcp in argv: {argv:?}"
831        );
832    }
833
834    #[test]
835    fn port_mappings_default_host_port_to_container_port() {
836        // host_port=0 in the parsed mapping means "AWS host-mode default";
837        // parse_port_mapping rewrites that to containerPort, so by the time
838        // we reach build_run_argv the host_port should already equal 80.
839        // Drive the same path through the JSON parser to lock in the
840        // default behaviour end to end.
841        let parsed =
842            crate::runtime::__test_parse_port_mapping(&serde_json::json!({"containerPort": 80}))
843                .expect("containerPort should parse");
844        assert_eq!(
845            parsed.host_port, 80,
846            "default hostPort should mirror containerPort"
847        );
848        let argv = argv_string(&plan_with_ports(vec![parsed], None));
849        assert!(
850            argv_has_publish(&argv, "80:80/tcp"),
851            "expected --publish 80:80/tcp when hostPort omitted: {argv:?}"
852        );
853    }
854
855    #[test]
856    fn port_mappings_default_protocol_tcp() {
857        let parsed = crate::runtime::__test_parse_port_mapping(
858            &serde_json::json!({"containerPort": 443, "hostPort": 443}),
859        )
860        .expect("containerPort should parse");
861        assert_eq!(parsed.protocol, "tcp");
862        let argv = argv_string(&plan_with_ports(vec![parsed], None));
863        assert!(
864            argv_has_publish(&argv, "443:443/tcp"),
865            "expected default protocol tcp: {argv:?}"
866        );
867    }
868
869    #[test]
870    fn awsvpc_network_mode_skips_publish() {
871        let plan = plan_with_ports(
872            vec![PortMapping {
873                container_port: 80,
874                host_port: 8080,
875                protocol: "tcp".into(),
876            }],
877            Some("awsvpc"),
878        );
879        let argv = argv_string(&plan);
880        assert!(
881            !argv.iter().any(|s| s == "--publish"),
882            "awsvpc must not emit --publish: {argv:?}"
883        );
884    }
885
886    #[test]
887    fn awsvpc_network_mode_includes_network_flag() {
888        let plan = plan_with_ports(Vec::new(), Some("awsvpc"));
889        let argv = argv_string(&plan);
890        let network_idx = argv.iter().position(|s| s == "--network");
891        assert!(
892            network_idx.is_some(),
893            "awsvpc must emit --network: {argv:?}"
894        );
895        let network_name = argv.get(network_idx.unwrap() + 1);
896        assert!(
897            network_name
898                .map(|n| n.starts_with("fakecloud-ecs-"))
899                .unwrap_or(false),
900            "awsvpc must reference fakecloud-ecs network: {argv:?}"
901        );
902    }
903
904    #[test]
905    fn network_bindings_populated_on_task() {
906        // Build a task in state, run mark_running_multi with a started
907        // container that has network_bindings populated, and verify
908        // task_to_json emits them under containers[0].networkBindings.
909        let mut accounts: MultiAccountState<EcsState> =
910            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
911        let acct = accounts.get_or_create("000000000000");
912        let arn = acct.cluster_arn("default");
913        acct.clusters
914            .insert("default".into(), Cluster::new("default", arn));
915        let mut task = Task {
916            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
917            task_id: "abc".into(),
918            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
919            cluster_name: "default".into(),
920            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/web:1".into(),
921            family: "web".into(),
922            revision: 1,
923            container_instance_arn: None,
924            capacity_provider_name: None,
925            last_status: "PENDING".into(),
926            desired_status: "RUNNING".into(),
927            launch_type: "FARGATE".into(),
928            platform_version: None,
929            cpu: None,
930            memory: None,
931            containers: vec![Container {
932                container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/web"
933                    .into(),
934                name: "web".into(),
935                image: "alpine".into(),
936                task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
937                last_status: "PENDING".into(),
938                exit_code: None,
939                reason: None,
940                runtime_id: None,
941                essential: true,
942                cpu: None,
943                memory: None,
944                memory_reservation: None,
945                network_bindings: Vec::new(),
946                network_interfaces: Vec::new(),
947                health_status: None,
948                managed_agents: None,
949                image_digest: None,
950            }],
951            overrides: serde_json::json!({}),
952            started_by: None,
953            group: None,
954            connectivity: "CONNECTING".into(),
955            stop_code: None,
956            stopped_reason: None,
957            created_at: Utc::now(),
958            started_at: None,
959            stopping_at: None,
960            stopped_at: None,
961            pull_started_at: None,
962            pull_stopped_at: None,
963            connectivity_at: None,
964            started_by_ref_id: None,
965            execution_role_arn: None,
966            task_role_arn: None,
967            tags: Vec::new(),
968            awslogs: None,
969            captured_logs: String::new(),
970            protection: None,
971            enable_execute_command: false,
972            attachments: Vec::new(),
973            volume_configurations: Vec::new(),
974            task_set_arn: None,
975        };
976        task.last_status = "PENDING".into();
977        acct.tasks.insert("abc".into(), task);
978        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
979
980        let bindings = vec![serde_json::json!({
981            "bindIP": "0.0.0.0",
982            "containerPort": 80,
983            "hostPort": 8080,
984            "protocol": "tcp",
985        })];
986        let started = vec![RunningContainer {
987            name: "web".into(),
988            container_id: "docker-id".into(),
989            essential: true,
990            exit_code: None,
991            network_bindings: bindings.clone(),
992            image_digest: None,
993        }];
994        mark_running_multi(&state, "000000000000", "abc", &started);
995
996        let accounts = state.read();
997        let task = accounts
998            .get("000000000000")
999            .unwrap()
1000            .tasks
1001            .get("abc")
1002            .unwrap();
1003        let json = task_to_json(task);
1004        let nb = &json["containers"][0]["networkBindings"];
1005        assert_eq!(nb, &serde_json::Value::Array(bindings));
1006    }
1007}