Skip to main content

fakecloud_ecs/
service_tasks.rs

1// Auto-extracted from service.rs as part of carryover service.rs split.
2
3#![allow(clippy::too_many_arguments)]
4
5use chrono::Utc;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use super::*;
11
12impl EcsService {
13    /// Spawn a task from a cross-service caller (EventBridge Scheduler /
14    /// EventBridge Rules) without going through the AwsRequest dispatch
15    /// path. Builds the JSON body and reuses [`Self::run_task`] so all
16    /// the existing validation / runtime spawn logic runs identically.
17    /// Returns Err with a human-readable message on validation failures —
18    /// the caller decides whether to surface the failure (e.g. DLQ).
19    pub fn run_task_external(
20        &self,
21        account_id: &str,
22        cluster: &str,
23        task_definition: &str,
24        launch_type: Option<&str>,
25        count: usize,
26    ) -> Result<(), String> {
27        use bytes::Bytes;
28        use http::{HeaderMap, Method};
29        use std::collections::HashMap;
30        let body = serde_json::json!({
31            "cluster": cluster,
32            "taskDefinition": task_definition,
33            "launchType": launch_type.unwrap_or("FARGATE"),
34            "count": count.max(1) as i64,
35        });
36        let body_bytes =
37            Bytes::from(serde_json::to_vec(&body).map_err(|e| format!("encode body: {e}"))?);
38        let req = AwsRequest {
39            service: "ecs".into(),
40            action: "RunTask".into(),
41            region: "us-east-1".into(),
42            account_id: account_id.to_string(),
43            request_id: uuid::Uuid::new_v4().to_string(),
44            headers: HeaderMap::new(),
45            query_params: HashMap::new(),
46            body: body_bytes,
47            body_stream: parking_lot::Mutex::new(None),
48            path_segments: Vec::new(),
49            raw_path: "/".into(),
50            raw_query: String::new(),
51            method: Method::POST,
52            is_query_protocol: false,
53            access_key_id: None,
54            principal: None,
55        };
56        self.run_task(&req)
57            .map(|_| ())
58            .map_err(|e| format!("{e:?}"))
59    }
60
61    pub fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
62        let body = request.json_body();
63        let td_ref = req_str(&body, "taskDefinition")?;
64        let cluster_ref = opt_str(&body, "cluster");
65        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
66        let launch_type = opt_str(&body, "launchType")
67            .unwrap_or("FARGATE")
68            .to_string();
69        let placement_constraints: Vec<Value> = body
70            .get("placementConstraints")
71            .and_then(|v| v.as_array())
72            .cloned()
73            .unwrap_or_default();
74        let placement_strategy: Vec<Value> = body
75            .get("placementStrategy")
76            .and_then(|v| v.as_array())
77            .cloned()
78            .unwrap_or_default();
79        let count = body
80            .get("count")
81            .and_then(|v| v.as_i64())
82            .filter(|n| (1..=10).contains(n))
83            .unwrap_or(1) as usize;
84        let group = opt_str(&body, "group").map(String::from);
85        let started_by = opt_str(&body, "startedBy").map(String::from);
86        let enable_execute_command = body
87            .get("enableExecuteCommand")
88            .and_then(|v| v.as_bool())
89            .unwrap_or(false);
90        let propagate_tags = opt_str(&body, "propagateTags").map(String::from);
91        let _enable_ecs_managed_tags = body
92            .get("enableECSManagedTags")
93            .and_then(|v| v.as_bool())
94            .unwrap_or(false);
95        let _capacity_provider_strategy: Vec<Value> = body
96            .get("capacityProviderStrategy")
97            .and_then(|v| v.as_array())
98            .cloned()
99            .unwrap_or_default();
100        let volume_configurations: Vec<Value> = body
101            .get("volumeConfigurations")
102            .and_then(|v| v.as_array())
103            .cloned()
104            .unwrap_or_default();
105        let _availability_zone_rebalancing =
106            opt_str(&body, "availabilityZoneRebalancing").map(String::from);
107        let mut tags = parse_tags(&body);
108
109        // PassRole trust check on any role overrides supplied via the
110        // overrides.taskRoleArn / overrides.executionRoleArn fields.
111        // The base task definition was already checked at Register time,
112        // but RunTask can override either role and AWS re-validates the
113        // trust policy on every call.
114        if let Some(overrides) = body.get("overrides") {
115            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
116                self.check_pass_role(&request.account_id, role_arn)?;
117            }
118            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
119                self.check_pass_role(&request.account_id, role_arn)?;
120            }
121        }
122
123        let account = request.account_id.clone();
124        let runtime = self.runtime.clone();
125        let mut accounts = self.state.write();
126        let state = accounts.get_or_create(&account);
127        let cluster_arn = state
128            .clusters
129            .get(&cluster_name)
130            .map(|c| c.cluster_arn.clone())
131            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
132        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
133        let revisions = state
134            .task_definitions
135            .get(&family)
136            .ok_or_else(|| task_definition_not_found(td_ref))?;
137        let td = match rev {
138            Some(n) => revisions
139                .get(&n)
140                .ok_or_else(|| task_definition_not_found(td_ref))?,
141            None => latest_active_revision(revisions)
142                .ok_or_else(|| task_definition_not_found(td_ref))?,
143        };
144        if td.status != "ACTIVE" {
145            return Err(client_exception(format!(
146                "Task definition {} is not ACTIVE",
147                td.task_definition_arn
148            )));
149        }
150        let td_arn = td.task_definition_arn.clone();
151        let td_family = td.family.clone();
152        let td_revision = td.revision;
153        let td_cpu = td.cpu.clone();
154        let td_memory = td.memory.clone();
155        let td_task_role = td.task_role_arn.clone();
156        let td_exec_role = td.execution_role_arn.clone();
157        let td_containers = td.container_definitions.clone();
158        // RunTask supports propagateTags=TASK_DEFINITION to copy the
159        // TaskDefinition's tags onto each spawned task, in addition to
160        // any tags supplied directly in the request body. Real AWS
161        // unions the two sets; explicit tags win on key conflicts.
162        if propagate_tags.as_deref() == Some("TASK_DEFINITION") {
163            let mut td_tags = td.tags.clone();
164            td_tags.retain(|t| !tags.iter().any(|x| x.key == t.key));
165            tags.extend(td_tags);
166        }
167
168        let mut spawned_tasks: Vec<String> = Vec::new();
169        let mut task_jsons: Vec<Value> = Vec::new();
170        for _ in 0..count {
171            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
172            let task_arn = state.task_arn(&cluster_name, &task_id);
173            let containers: Vec<Container> = td_containers
174                .iter()
175                .map(|def| Container {
176                    container_arn: format!(
177                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
178                        state.region,
179                        state.account_id,
180                        cluster_name,
181                        task_id,
182                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
183                    ),
184                    name: def
185                        .get("name")
186                        .and_then(|v| v.as_str())
187                        .unwrap_or("app")
188                        .to_string(),
189                    image: def
190                        .get("image")
191                        .and_then(|v| v.as_str())
192                        .unwrap_or("")
193                        .to_string(),
194                    task_arn: task_arn.clone(),
195                    last_status: "PENDING".into(),
196                    exit_code: None,
197                    reason: None,
198                    runtime_id: None,
199                    essential: def
200                        .get("essential")
201                        .and_then(|v| v.as_bool())
202                        .unwrap_or(true),
203                    cpu: def
204                        .get("cpu")
205                        .and_then(|v| v.as_i64())
206                        .map(|n| n.to_string()),
207                    memory: def
208                        .get("memory")
209                        .and_then(|v| v.as_i64())
210                        .map(|n| n.to_string()),
211                    memory_reservation: def
212                        .get("memoryReservation")
213                        .and_then(|v| v.as_i64())
214                        .map(|n| n.to_string()),
215                    network_bindings: Vec::new(),
216                    network_interfaces: Vec::new(),
217                    health_status: Some("UNKNOWN".to_string()),
218                    managed_agents: None,
219                    image_digest: None,
220                })
221                .collect();
222            let awslogs = td_containers.iter().find_map(|def| {
223                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
224                let log_cfg = def.get("logConfiguration")?;
225                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
226                    return None;
227                }
228                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
229                Some(AwsLogsConfig {
230                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
231                    stream_prefix: opts
232                        .get("awslogs-stream-prefix")
233                        .and_then(|v| v.as_str())
234                        .map(String::from),
235                    region: opts
236                        .get("awslogs-region")
237                        .and_then(|v| v.as_str())
238                        .unwrap_or(&state.region)
239                        .to_string(),
240                    container_name: name,
241                })
242            });
243            let capacity_provider_name = body
244                .get("capacityProviderStrategy")
245                .and_then(|v| v.as_array())
246                .and_then(|arr| arr.first())
247                .and_then(|item| item.get("capacityProvider"))
248                .and_then(|v| v.as_str())
249                .map(String::from);
250            let mut task = Task {
251                task_arn: task_arn.clone(),
252                task_id: task_id.clone(),
253                cluster_arn: cluster_arn.clone(),
254                cluster_name: cluster_name.clone(),
255                task_definition_arn: td_arn.clone(),
256                family: td_family.clone(),
257                revision: td_revision,
258                container_instance_arn: None,
259                capacity_provider_name,
260                last_status: "PROVISIONING".into(),
261                desired_status: "RUNNING".into(),
262                launch_type: launch_type.clone(),
263                platform_version: Some("1.4.0".into()),
264                cpu: body
265                    .get("overrides")
266                    .and_then(|v| v.get("cpu"))
267                    .and_then(|v| v.as_str())
268                    .map(String::from)
269                    .or_else(|| td_cpu.clone()),
270                memory: body
271                    .get("overrides")
272                    .and_then(|v| v.get("memory"))
273                    .and_then(|v| v.as_str())
274                    .map(String::from)
275                    .or_else(|| td_memory.clone()),
276                containers,
277                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
278                started_by: started_by.clone(),
279                group: group.clone(),
280                connectivity: "CONNECTING".into(),
281                stop_code: None,
282                stopped_reason: None,
283                created_at: Utc::now(),
284                started_at: None,
285                stopping_at: None,
286                stopped_at: None,
287                pull_started_at: None,
288                pull_stopped_at: None,
289                connectivity_at: None,
290                started_by_ref_id: None,
291                execution_role_arn: td_exec_role.clone(),
292                task_role_arn: td_task_role.clone(),
293                tags: tags.clone(),
294                awslogs,
295                captured_logs: String::new(),
296                protection: None,
297                enable_execute_command,
298                attachments: Vec::new(),
299                volume_configurations: volume_configurations.clone(),
300                task_set_arn: None,
301            };
302            // Best-effort placement for EC2 / EXTERNAL launch types.
303            if launch_type != "FARGATE" {
304                if let Some(arn) = crate::placement::select_container_instance(
305                    state,
306                    &cluster_name,
307                    &placement_constraints,
308                    &placement_strategy,
309                    task.group.as_deref(),
310                    &td_arn,
311                    &launch_type,
312                ) {
313                    task.container_instance_arn = Some(arn.clone());
314                    if let Some(ci) = state
315                        .container_instances
316                        .values_mut()
317                        .find(|ci| ci.container_instance_arn == arn)
318                    {
319                        ci.pending_tasks_count += 1;
320                    }
321                }
322            }
323            state.tasks.insert(task_id.clone(), task.clone());
324            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
325                cluster.pending_tasks_count += 1;
326            }
327            // Snapshot-in-progress: transition to PENDING synchronously so
328            // callers that immediately DescribeTasks see movement. RUNNING /
329            // STOPPED come later from the background runtime task.
330            if let Some(t) = state.tasks.get_mut(&task_id) {
331                t.last_status = "PENDING".into();
332            }
333            task_jsons.push(task_to_json(&task));
334            spawned_tasks.push(task_id.clone());
335        }
336        drop(accounts);
337
338        // Launch container execution outside the state lock.
339        if let Some(rt) = runtime {
340            for id in &spawned_tasks {
341                rt.clone()
342                    .run_task(self.state.clone(), id.clone(), account.clone());
343            }
344        } else {
345            // No runtime available — fail fast so the task doesn't stay
346            // PENDING forever. We incremented pending_tasks_count above;
347            // decrement it here so the cluster counter doesn't drift and
348            // block later DeleteCluster calls.
349            let mut accounts = self.state.write();
350            if let Some(state) = accounts.get_mut(&account) {
351                let mut cluster_drains: Vec<String> = Vec::new();
352                for id in &spawned_tasks {
353                    if let Some(t) = state.tasks.get_mut(id) {
354                        t.last_status = "STOPPED".into();
355                        // desired_status stays RUNNING: the task was requested
356                        // to run but failed to start. AWS keeps desired_status
357                        // as RUNNING for failed standalone RunTask until the
358                        // caller explicitly stops it. This also ensures
359                        // list_tasks (default filter=RUNNING) finds it.
360                        t.stop_code = Some("TaskFailedToStart".into());
361                        t.stopped_reason = Some(format!(
362                            "No container runtime available (docker/podman not installed). {}",
363                            fakecloud_core::container_net::CONTAINER_RUNTIME_HINT
364                        ));
365                        t.stopped_at = Some(Utc::now());
366                        for c in t.containers.iter_mut() {
367                            c.last_status = "STOPPED".into();
368                        }
369                        cluster_drains.push(t.cluster_name.clone());
370                    }
371                }
372                for name in cluster_drains {
373                    if let Some(cluster) = state.clusters.get_mut(&name) {
374                        if cluster.pending_tasks_count > 0 {
375                            cluster.pending_tasks_count -= 1;
376                        }
377                    }
378                }
379            }
380        }
381
382        Ok(AwsResponse::ok_json(json!({
383            "tasks": task_jsons,
384            "failures": [],
385        })))
386    }
387
388    pub(super) fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389        // StartTask targets explicit container instances. Our ECS emulator
390        // has no concept of registered container instances yet (Batch 4);
391        // fall through to the same semantics as RunTask so the API is
392        // usable while the container-instance surface is pending.
393        self.run_task(request)
394    }
395
396    pub(super) async fn stop_task(
397        &self,
398        request: &AwsRequest,
399    ) -> Result<AwsResponse, AwsServiceError> {
400        let body = request.json_body();
401        let task_ref = req_str(&body, "task")?;
402        let reason = opt_str(&body, "reason")
403            .unwrap_or("UserInitiated")
404            .to_string();
405        let cluster_ref = opt_str(&body, "cluster");
406        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
407
408        let (task_id, account, task_snapshot) = {
409            let account = request.account_id.clone();
410            let mut accounts = self.state.write();
411            let state = accounts
412                .get_mut(&account)
413                .ok_or_else(|| task_not_found(task_ref))?;
414            let task_id = resolve_task_id(state, task_ref)?;
415            let task = state
416                .tasks
417                .get_mut(&task_id)
418                .ok_or_else(|| task_not_found(task_ref))?;
419            task.desired_status = "STOPPED".into();
420            task.stopping_at = Some(Utc::now());
421            task.stopped_reason = Some(reason.clone());
422            task.stop_code = Some("UserInitiated".into());
423            (task_id, account, task.clone())
424        };
425        if let Some(rt) = &self.runtime {
426            rt.stop_task(&task_id, &reason).await;
427        }
428        let _ = account;
429        Ok(AwsResponse::ok_json(json!({
430            "task": task_to_json(&task_snapshot),
431        })))
432    }
433
434    pub(super) fn describe_tasks(
435        &self,
436        request: &AwsRequest,
437    ) -> Result<AwsResponse, AwsServiceError> {
438        let body = request.json_body();
439        let refs: Vec<String> = req_array(&body, "tasks")?
440            .iter()
441            .filter_map(|v| v.as_str().map(String::from))
442            .collect();
443        let include_tags = body
444            .get("include")
445            .and_then(|v| v.as_array())
446            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
447            .unwrap_or(false);
448
449        let account = request.account_id.clone();
450        let accounts = self.state.read();
451        let Some(state) = accounts.get(&account) else {
452            return Ok(AwsResponse::ok_json(json!({
453                "tasks": [],
454                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
455            })));
456        };
457        let mut found = Vec::new();
458        let mut failures = Vec::new();
459        for input in &refs {
460            let task_id = task_id_from_ref(input);
461            match state.tasks.get(&task_id) {
462                Some(t) => {
463                    let mut v = task_to_json(t);
464                    if include_tags {
465                        v.as_object_mut()
466                            .unwrap()
467                            .insert("tags".into(), tags_json(&t.tags));
468                    }
469                    found.push(v);
470                }
471                None => {
472                    failures.push(json!({
473                        "arn": input,
474                        "reason": "MISSING",
475                    }));
476                }
477            }
478        }
479        Ok(AwsResponse::ok_json(json!({
480            "tasks": found,
481            "failures": failures,
482        })))
483    }
484
485    pub(super) fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
486        let body = request.json_body();
487        validate_enum_opt(&body, "desiredStatus", &["RUNNING", "PENDING", "STOPPED"])?;
488        validate_enum_opt(
489            &body,
490            "launchType",
491            &["EC2", "FARGATE", "EXTERNAL", "MANAGED_INSTANCES"],
492        )?;
493        let cluster_ref = opt_str(&body, "cluster");
494        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
495        let family = opt_str(&body, "family");
496        let status_filter = opt_str(&body, "desiredStatus").or(Some("RUNNING"));
497        let started_by = opt_str(&body, "startedBy");
498        let max_results = body
499            .get("maxResults")
500            .and_then(|v| v.as_i64())
501            .filter(|n| (1..=100).contains(n))
502            .map(|n| n as usize)
503            .unwrap_or(100);
504        let next_token = opt_str(&body, "nextToken").unwrap_or("");
505
506        let account = request.account_id.clone();
507        let accounts = self.state.read();
508        let mut arns: Vec<String> = match accounts.get(&account) {
509            Some(state) => state
510                .tasks
511                .values()
512                .filter(|t| t.cluster_name == cluster_name)
513                .filter(|t| family.is_none_or(|f| t.family == f))
514                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
515                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
516                .map(|t| t.task_arn.clone())
517                .collect(),
518            None => Vec::new(),
519        };
520        arns.sort();
521        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
522        let end = (start + max_results).min(arns.len());
523        let page = arns[start..end].to_vec();
524        let mut out = json!({"taskArns": page});
525        if end < arns.len() {
526            out.as_object_mut()
527                .unwrap()
528                .insert("nextToken".into(), json!(end.to_string()));
529        }
530        Ok(AwsResponse::ok_json(out))
531    }
532}
533
534#[cfg(test)]
535mod multi_container_tests {
536    use super::*;
537    use crate::EcsService;
538    use bytes::Bytes;
539    use fakecloud_core::multi_account::MultiAccountState;
540    use http::{HeaderMap, Method};
541    use parking_lot::RwLock;
542    use std::collections::HashMap;
543    use std::sync::Arc;
544
545    fn fresh_service() -> EcsService {
546        let accounts: MultiAccountState<EcsState> =
547            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
548        let state = Arc::new(RwLock::new(accounts));
549        let svc = EcsService::new(state.clone());
550        // Pre-create the cluster so RunTask doesn't trip on a missing one.
551        let mut accounts = state.write();
552        let s = accounts.get_or_create("000000000000");
553        let arn = s.cluster_arn("default");
554        s.clusters
555            .insert("default".into(), Cluster::new("default", arn));
556        drop(accounts);
557        svc
558    }
559
560    fn make_request(action: &str, body: Value) -> AwsRequest {
561        let body_bytes = Bytes::from(serde_json::to_vec(&body).unwrap());
562        AwsRequest {
563            service: "ecs".into(),
564            action: action.into(),
565            region: "us-east-1".into(),
566            account_id: "000000000000".into(),
567            request_id: uuid::Uuid::new_v4().to_string(),
568            headers: HeaderMap::new(),
569            query_params: HashMap::new(),
570            body: body_bytes,
571            body_stream: parking_lot::Mutex::new(None),
572            path_segments: Vec::new(),
573            raw_path: "/".into(),
574            raw_query: String::new(),
575            method: Method::POST,
576            is_query_protocol: false,
577            access_key_id: None,
578            principal: None,
579        }
580    }
581
582    #[test]
583    fn register_task_def_with_two_containers_then_run_task_starts_both() {
584        let svc = fresh_service();
585        let reg = make_request(
586            "RegisterTaskDefinition",
587            json!({
588                "family": "multi",
589                "containerDefinitions": [
590                    {"name": "app", "image": "alpine"},
591                    {"name": "sidecar", "image": "alpine"}
592                ]
593            }),
594        );
595        svc.register_task_definition(&reg)
596            .expect("register should succeed");
597
598        let run = make_request(
599            "RunTask",
600            json!({
601                "cluster": "default",
602                "taskDefinition": "multi",
603            }),
604        );
605        let resp = svc.run_task(&run).expect("run_task should succeed");
606        let body: Value =
607            serde_json::from_slice(resp.body.expect_bytes()).expect("body should be valid JSON");
608        let tasks = body
609            .get("tasks")
610            .and_then(|v| v.as_array())
611            .expect("tasks array");
612        assert_eq!(tasks.len(), 1);
613        let task = &tasks[0];
614        let containers = task
615            .get("containers")
616            .and_then(|v| v.as_array())
617            .expect("containers array on task");
618        assert_eq!(containers.len(), 2, "expected both containers in task");
619        let names: Vec<&str> = containers
620            .iter()
621            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
622            .collect();
623        assert!(names.contains(&"app"));
624        assert!(names.contains(&"sidecar"));
625
626        // Per-container ARNs must be distinct so DescribeTasks can address
627        // each container independently.
628        let arns: std::collections::HashSet<&str> = containers
629            .iter()
630            .filter_map(|c| c.get("containerArn").and_then(|v| v.as_str()))
631            .collect();
632        assert_eq!(arns.len(), 2);
633    }
634
635    #[test]
636    fn register_task_def_defaults_essential_true() {
637        let svc = fresh_service();
638        let reg = make_request(
639            "RegisterTaskDefinition",
640            json!({
641                "family": "default-essential",
642                // No `essential` declared on either container.
643                "containerDefinitions": [
644                    {"name": "main", "image": "alpine"},
645                    {"name": "extra", "image": "alpine"}
646                ]
647            }),
648        );
649        svc.register_task_definition(&reg).unwrap();
650
651        let run = make_request(
652            "RunTask",
653            json!({
654                "cluster": "default",
655                "taskDefinition": "default-essential",
656            }),
657        );
658        let resp = svc.run_task(&run).unwrap();
659        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
660        let containers = body["tasks"][0]["containers"].as_array().unwrap();
661        // The `essential` flag is a TaskDefinition.ContainerDefinition
662        // field, not part of the runtime `Container` response shape.
663        // Real ECS doesn't echo it back on RunTask, and the conformance
664        // probe rejects responses that do.
665        for c in containers {
666            assert!(
667                c.get("essential").is_none(),
668                "container {:?} must not carry `essential` on the runtime shape",
669                c.get("name")
670            );
671        }
672    }
673
674    #[test]
675    fn task_to_json_emits_full_container_array() {
676        // Build a Task with two containers directly and confirm helper
677        // emits both entries in the response shape.
678        let mut task = Task {
679            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
680            task_id: "abc".into(),
681            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
682            cluster_name: "default".into(),
683            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/multi:1"
684                .into(),
685            family: "multi".into(),
686            revision: 1,
687            container_instance_arn: None,
688            capacity_provider_name: None,
689            last_status: "RUNNING".into(),
690            desired_status: "RUNNING".into(),
691            launch_type: "FARGATE".into(),
692            platform_version: None,
693            cpu: None,
694            memory: None,
695            containers: Vec::new(),
696            overrides: json!({}),
697            started_by: None,
698            group: None,
699            connectivity: "CONNECTED".into(),
700            stop_code: None,
701            stopped_reason: None,
702            created_at: chrono::Utc::now(),
703            started_at: None,
704            stopping_at: None,
705            stopped_at: None,
706            pull_started_at: None,
707            pull_stopped_at: None,
708            connectivity_at: None,
709            started_by_ref_id: None,
710            execution_role_arn: None,
711            task_role_arn: None,
712            tags: Vec::new(),
713            awslogs: None,
714            captured_logs: String::new(),
715            protection: None,
716            enable_execute_command: false,
717            attachments: Vec::new(),
718            volume_configurations: Vec::new(),
719            task_set_arn: None,
720        };
721        for name in ["app", "sidecar"] {
722            task.containers.push(Container {
723                container_arn: format!(
724                    "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
725                ),
726                name: name.into(),
727                image: "alpine".into(),
728                task_arn: task.task_arn.clone(),
729                last_status: "RUNNING".into(),
730                exit_code: None,
731                reason: None,
732                runtime_id: Some(format!("docker-{name}")),
733                essential: true,
734                cpu: None,
735                memory: None,
736                memory_reservation: None,
737                network_bindings: Vec::new(),
738                network_interfaces: Vec::new(),
739                health_status: None,
740                managed_agents: None,
741                image_digest: None,
742            });
743        }
744
745        let v = task_to_json(&task);
746        let containers = v
747            .get("containers")
748            .and_then(|v| v.as_array())
749            .expect("containers array");
750        assert_eq!(containers.len(), 2);
751        let names: Vec<&str> = containers
752            .iter()
753            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
754            .collect();
755        assert_eq!(names, vec!["app", "sidecar"]);
756        for c in containers {
757            assert!(c.get("containerArn").is_some());
758            assert!(c.get("name").is_some());
759            assert!(c.get("lastStatus").is_some());
760            assert!(c.get("runtimeId").is_some());
761            assert!(c.get("essential").is_none());
762        }
763    }
764}
765
766#[cfg(test)]
767mod port_mapping_tests {
768    //! Cover the `portMappings` -> `docker run --publish` translation
769    //! plus the per-container `networkBindings` projected onto the task.
770    //! The argv builder is exercised directly so we don't need a real
771    //! container CLI on the test host.
772    use super::*;
773    use crate::runtime::{
774        build_run_argv, mark_running_multi, ContainerPlan, PortMapping, RunningContainer,
775    };
776    use crate::state::{Container, EcsState};
777    use crate::SharedEcsState;
778    use chrono::Utc;
779    use fakecloud_core::multi_account::MultiAccountState;
780    use parking_lot::RwLock;
781    use std::sync::Arc;
782
783    fn plan_with_ports(
784        port_mappings: Vec<PortMapping>,
785        network_mode: Option<&str>,
786    ) -> ContainerPlan {
787        ContainerPlan {
788            container_name: "app".into(),
789            image: "alpine:latest".into(),
790            env: Vec::new(),
791            entry_point: Vec::new(),
792            command: Vec::new(),
793            secrets_refs: Vec::new(),
794            essential: true,
795            has_task_role: false,
796            port_mappings,
797            network_mode: network_mode.map(String::from),
798            depends_on: Vec::new(),
799            health_check: None,
800            volume_mounts: Vec::new(),
801            ulimits: Vec::new(),
802            linux_parameters: None,
803            stop_timeout: None,
804            user: None,
805            working_directory: None,
806            tty: false,
807            interactive: false,
808            readonly_rootfs: false,
809        }
810    }
811
812    fn argv_string(plan: &ContainerPlan) -> Vec<String> {
813        build_run_argv(
814            plan,
815            &[],
816            "task-1",
817            "host.docker.internal",
818            None,
819            "alpine:latest",
820            true,
821        )
822    }
823
824    /// Helper for asserting a `--publish <spec>` pair is present in argv.
825    /// Returns true when the flag/value pair appears as adjacent entries.
826    fn argv_has_publish(argv: &[String], spec: &str) -> bool {
827        argv.windows(2).any(|w| w[0] == "--publish" && w[1] == spec)
828    }
829
830    #[test]
831    fn port_mappings_translate_to_publish_flags() {
832        let plan = plan_with_ports(
833            vec![PortMapping {
834                container_port: 80,
835                host_port: 8080,
836                protocol: "tcp".into(),
837            }],
838            None,
839        );
840        let argv = argv_string(&plan);
841        assert!(
842            argv_has_publish(&argv, "80:8080/tcp"),
843            "expected --publish 80:8080/tcp in argv: {argv:?}"
844        );
845    }
846
847    #[test]
848    fn port_mappings_default_host_port_to_container_port() {
849        // host_port=0 in the parsed mapping means "AWS host-mode default";
850        // parse_port_mapping rewrites that to containerPort, so by the time
851        // we reach build_run_argv the host_port should already equal 80.
852        // Drive the same path through the JSON parser to lock in the
853        // default behaviour end to end.
854        let parsed =
855            crate::runtime::__test_parse_port_mapping(&serde_json::json!({"containerPort": 80}))
856                .expect("containerPort should parse");
857        assert_eq!(
858            parsed.host_port, 80,
859            "default hostPort should mirror containerPort"
860        );
861        let argv = argv_string(&plan_with_ports(vec![parsed], None));
862        assert!(
863            argv_has_publish(&argv, "80:80/tcp"),
864            "expected --publish 80:80/tcp when hostPort omitted: {argv:?}"
865        );
866    }
867
868    #[test]
869    fn port_mappings_default_protocol_tcp() {
870        let parsed = crate::runtime::__test_parse_port_mapping(
871            &serde_json::json!({"containerPort": 443, "hostPort": 443}),
872        )
873        .expect("containerPort should parse");
874        assert_eq!(parsed.protocol, "tcp");
875        let argv = argv_string(&plan_with_ports(vec![parsed], None));
876        assert!(
877            argv_has_publish(&argv, "443:443/tcp"),
878            "expected default protocol tcp: {argv:?}"
879        );
880    }
881
882    #[test]
883    fn awsvpc_network_mode_skips_publish() {
884        let plan = plan_with_ports(
885            vec![PortMapping {
886                container_port: 80,
887                host_port: 8080,
888                protocol: "tcp".into(),
889            }],
890            Some("awsvpc"),
891        );
892        let argv = argv_string(&plan);
893        assert!(
894            !argv.iter().any(|s| s == "--publish"),
895            "awsvpc must not emit --publish: {argv:?}"
896        );
897    }
898
899    #[test]
900    fn awsvpc_network_mode_includes_network_flag() {
901        let plan = plan_with_ports(Vec::new(), Some("awsvpc"));
902        let argv = argv_string(&plan);
903        let network_idx = argv.iter().position(|s| s == "--network");
904        assert!(
905            network_idx.is_some(),
906            "awsvpc must emit --network: {argv:?}"
907        );
908        let network_name = argv.get(network_idx.unwrap() + 1);
909        assert!(
910            network_name
911                .map(|n| n.starts_with("fakecloud-ecs-"))
912                .unwrap_or(false),
913            "awsvpc must reference fakecloud-ecs network: {argv:?}"
914        );
915    }
916
917    #[test]
918    fn network_bindings_populated_on_task() {
919        // Build a task in state, run mark_running_multi with a started
920        // container that has network_bindings populated, and verify
921        // task_to_json emits them under containers[0].networkBindings.
922        let mut accounts: MultiAccountState<EcsState> =
923            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
924        let acct = accounts.get_or_create("000000000000");
925        let arn = acct.cluster_arn("default");
926        acct.clusters
927            .insert("default".into(), Cluster::new("default", arn));
928        let mut task = Task {
929            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
930            task_id: "abc".into(),
931            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
932            cluster_name: "default".into(),
933            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/web:1".into(),
934            family: "web".into(),
935            revision: 1,
936            container_instance_arn: None,
937            capacity_provider_name: None,
938            last_status: "PENDING".into(),
939            desired_status: "RUNNING".into(),
940            launch_type: "FARGATE".into(),
941            platform_version: None,
942            cpu: None,
943            memory: None,
944            containers: vec![Container {
945                container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/web"
946                    .into(),
947                name: "web".into(),
948                image: "alpine".into(),
949                task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
950                last_status: "PENDING".into(),
951                exit_code: None,
952                reason: None,
953                runtime_id: None,
954                essential: true,
955                cpu: None,
956                memory: None,
957                memory_reservation: None,
958                network_bindings: Vec::new(),
959                network_interfaces: Vec::new(),
960                health_status: None,
961                managed_agents: None,
962                image_digest: None,
963            }],
964            overrides: serde_json::json!({}),
965            started_by: None,
966            group: None,
967            connectivity: "CONNECTING".into(),
968            stop_code: None,
969            stopped_reason: None,
970            created_at: Utc::now(),
971            started_at: None,
972            stopping_at: None,
973            stopped_at: None,
974            pull_started_at: None,
975            pull_stopped_at: None,
976            connectivity_at: None,
977            started_by_ref_id: None,
978            execution_role_arn: None,
979            task_role_arn: None,
980            tags: Vec::new(),
981            awslogs: None,
982            captured_logs: String::new(),
983            protection: None,
984            enable_execute_command: false,
985            attachments: Vec::new(),
986            volume_configurations: Vec::new(),
987            task_set_arn: None,
988        };
989        task.last_status = "PENDING".into();
990        acct.tasks.insert("abc".into(), task);
991        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
992
993        let bindings = vec![serde_json::json!({
994            "bindIP": "0.0.0.0",
995            "containerPort": 80,
996            "hostPort": 8080,
997            "protocol": "tcp",
998        })];
999        let started = vec![RunningContainer {
1000            name: "web".into(),
1001            container_id: "docker-id".into(),
1002            essential: true,
1003            exit_code: None,
1004            network_bindings: bindings.clone(),
1005            image_digest: None,
1006        }];
1007        mark_running_multi(&state, "000000000000", "abc", &started);
1008
1009        let accounts = state.read();
1010        let task = accounts
1011            .get("000000000000")
1012            .unwrap()
1013            .tasks
1014            .get("abc")
1015            .unwrap();
1016        let json = task_to_json(task);
1017        let nb = &json["containers"][0]["networkBindings"];
1018        assert_eq!(nb, &serde_json::Value::Array(bindings));
1019    }
1020}