Skip to main content

fakecloud_ecs/
service_tasks.rs

1// Auto-extracted from service.rs as part of carryover service.rs split.
2
3#![allow(clippy::too_many_arguments)]
4
5use chrono::Utc;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use super::*;
11
12impl EcsService {
13    /// Spawn a task from a cross-service caller (EventBridge Scheduler /
14    /// EventBridge Rules) without going through the AwsRequest dispatch
15    /// path. Builds the JSON body and reuses [`Self::run_task`] so all
16    /// the existing validation / runtime spawn logic runs identically.
17    /// Returns Err with a human-readable message on validation failures —
18    /// the caller decides whether to surface the failure (e.g. DLQ).
19    pub fn run_task_external(
20        &self,
21        account_id: &str,
22        cluster: &str,
23        task_definition: &str,
24        launch_type: Option<&str>,
25        count: usize,
26    ) -> Result<(), String> {
27        use bytes::Bytes;
28        use http::{HeaderMap, Method};
29        use std::collections::HashMap;
30        let body = serde_json::json!({
31            "cluster": cluster,
32            "taskDefinition": task_definition,
33            "launchType": launch_type.unwrap_or("FARGATE"),
34            "count": count.max(1) as i64,
35        });
36        let body_bytes =
37            Bytes::from(serde_json::to_vec(&body).map_err(|e| format!("encode body: {e}"))?);
38        let req = AwsRequest {
39            service: "ecs".into(),
40            action: "RunTask".into(),
41            region: "us-east-1".into(),
42            account_id: account_id.to_string(),
43            request_id: uuid::Uuid::new_v4().to_string(),
44            headers: HeaderMap::new(),
45            query_params: HashMap::new(),
46            body: body_bytes,
47            body_stream: parking_lot::Mutex::new(None),
48            path_segments: Vec::new(),
49            raw_path: "/".into(),
50            raw_query: String::new(),
51            method: Method::POST,
52            is_query_protocol: false,
53            access_key_id: None,
54            principal: None,
55        };
56        self.run_task(&req)
57            .map(|_| ())
58            .map_err(|e| format!("{e:?}"))
59    }
60
61    pub fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
62        let body = request.json_body();
63        let td_ref = req_str(&body, "taskDefinition")?;
64        let cluster_ref = opt_str(&body, "cluster");
65        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
66        let launch_type = opt_str(&body, "launchType")
67            .unwrap_or("FARGATE")
68            .to_string();
69        let placement_constraints: Vec<Value> = body
70            .get("placementConstraints")
71            .and_then(|v| v.as_array())
72            .cloned()
73            .unwrap_or_default();
74        let placement_strategy: Vec<Value> = body
75            .get("placementStrategy")
76            .and_then(|v| v.as_array())
77            .cloned()
78            .unwrap_or_default();
79        let count = body
80            .get("count")
81            .and_then(|v| v.as_i64())
82            .filter(|n| (1..=10).contains(n))
83            .unwrap_or(1) as usize;
84        let group = opt_str(&body, "group").map(String::from);
85        let started_by = opt_str(&body, "startedBy").map(String::from);
86        let enable_execute_command = body
87            .get("enableExecuteCommand")
88            .and_then(|v| v.as_bool())
89            .unwrap_or(false);
90        let propagate_tags = opt_str(&body, "propagateTags").map(String::from);
91        let _enable_ecs_managed_tags = body
92            .get("enableECSManagedTags")
93            .and_then(|v| v.as_bool())
94            .unwrap_or(false);
95        let _capacity_provider_strategy: Vec<Value> = body
96            .get("capacityProviderStrategy")
97            .and_then(|v| v.as_array())
98            .cloned()
99            .unwrap_or_default();
100        let volume_configurations: Vec<Value> = body
101            .get("volumeConfigurations")
102            .and_then(|v| v.as_array())
103            .cloned()
104            .unwrap_or_default();
105        let _availability_zone_rebalancing =
106            opt_str(&body, "availabilityZoneRebalancing").map(String::from);
107        let mut tags = parse_tags(&body);
108
109        // PassRole trust check on any role overrides supplied via the
110        // overrides.taskRoleArn / overrides.executionRoleArn fields.
111        // The base task definition was already checked at Register time,
112        // but RunTask can override either role and AWS re-validates the
113        // trust policy on every call.
114        if let Some(overrides) = body.get("overrides") {
115            if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
116                self.check_pass_role(&request.account_id, role_arn)?;
117            }
118            if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
119                self.check_pass_role(&request.account_id, role_arn)?;
120            }
121        }
122
123        let account = request.account_id.clone();
124        let runtime = self.runtime.clone();
125        let mut accounts = self.state.write();
126        let state = accounts.get_or_create(&account);
127        let cluster_arn = state
128            .clusters
129            .get(&cluster_name)
130            .map(|c| c.cluster_arn.clone())
131            .unwrap_or_else(|| state.cluster_arn(&cluster_name));
132        let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
133        let revisions = state
134            .task_definitions
135            .get(&family)
136            .ok_or_else(|| task_definition_not_found(td_ref))?;
137        let td = match rev {
138            Some(n) => revisions
139                .get(&n)
140                .ok_or_else(|| task_definition_not_found(td_ref))?,
141            None => latest_active_revision(revisions)
142                .ok_or_else(|| task_definition_not_found(td_ref))?,
143        };
144        if td.status != "ACTIVE" {
145            return Err(client_exception(format!(
146                "Task definition {} is not ACTIVE",
147                td.task_definition_arn
148            )));
149        }
150        let td_arn = td.task_definition_arn.clone();
151        let td_family = td.family.clone();
152        let td_revision = td.revision;
153        let td_cpu = td.cpu.clone();
154        let td_memory = td.memory.clone();
155        let td_task_role = td.task_role_arn.clone();
156        let td_exec_role = td.execution_role_arn.clone();
157        let td_containers = td.container_definitions.clone();
158        // RunTask supports propagateTags=TASK_DEFINITION to copy the
159        // TaskDefinition's tags onto each spawned task, in addition to
160        // any tags supplied directly in the request body. Real AWS
161        // unions the two sets; explicit tags win on key conflicts.
162        if propagate_tags.as_deref() == Some("TASK_DEFINITION") {
163            let mut td_tags = td.tags.clone();
164            td_tags.retain(|t| !tags.iter().any(|x| x.key == t.key));
165            tags.extend(td_tags);
166        }
167
168        let mut spawned_tasks: Vec<String> = Vec::new();
169        let mut task_jsons: Vec<Value> = Vec::new();
170        for _ in 0..count {
171            let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
172            let task_arn = state.task_arn(&cluster_name, &task_id);
173            let containers: Vec<Container> = td_containers
174                .iter()
175                .map(|def| Container {
176                    container_arn: format!(
177                        "arn:aws:ecs:{}:{}:container/{}/{}/{}",
178                        state.region,
179                        state.account_id,
180                        cluster_name,
181                        task_id,
182                        def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
183                    ),
184                    name: def
185                        .get("name")
186                        .and_then(|v| v.as_str())
187                        .unwrap_or("app")
188                        .to_string(),
189                    image: def
190                        .get("image")
191                        .and_then(|v| v.as_str())
192                        .unwrap_or("")
193                        .to_string(),
194                    task_arn: task_arn.clone(),
195                    last_status: "PENDING".into(),
196                    exit_code: None,
197                    reason: None,
198                    runtime_id: None,
199                    essential: def
200                        .get("essential")
201                        .and_then(|v| v.as_bool())
202                        .unwrap_or(true),
203                    cpu: def
204                        .get("cpu")
205                        .and_then(|v| v.as_i64())
206                        .map(|n| n.to_string()),
207                    memory: def
208                        .get("memory")
209                        .and_then(|v| v.as_i64())
210                        .map(|n| n.to_string()),
211                    memory_reservation: def
212                        .get("memoryReservation")
213                        .and_then(|v| v.as_i64())
214                        .map(|n| n.to_string()),
215                    network_bindings: Vec::new(),
216                    network_interfaces: Vec::new(),
217                    health_status: Some("UNKNOWN".to_string()),
218                    managed_agents: None,
219                    image_digest: None,
220                })
221                .collect();
222            let awslogs = td_containers.iter().find_map(|def| {
223                let name = def.get("name").and_then(|v| v.as_str())?.to_string();
224                let log_cfg = def.get("logConfiguration")?;
225                if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
226                    return None;
227                }
228                let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
229                Some(AwsLogsConfig {
230                    group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
231                    stream_prefix: opts
232                        .get("awslogs-stream-prefix")
233                        .and_then(|v| v.as_str())
234                        .map(String::from),
235                    region: opts
236                        .get("awslogs-region")
237                        .and_then(|v| v.as_str())
238                        .unwrap_or(&state.region)
239                        .to_string(),
240                    container_name: name,
241                })
242            });
243            let capacity_provider_name = body
244                .get("capacityProviderStrategy")
245                .and_then(|v| v.as_array())
246                .and_then(|arr| arr.first())
247                .and_then(|item| item.get("capacityProvider"))
248                .and_then(|v| v.as_str())
249                .map(String::from);
250            let mut task = Task {
251                task_arn: task_arn.clone(),
252                task_id: task_id.clone(),
253                cluster_arn: cluster_arn.clone(),
254                cluster_name: cluster_name.clone(),
255                task_definition_arn: td_arn.clone(),
256                family: td_family.clone(),
257                revision: td_revision,
258                container_instance_arn: None,
259                capacity_provider_name,
260                last_status: "PROVISIONING".into(),
261                desired_status: "RUNNING".into(),
262                launch_type: launch_type.clone(),
263                platform_version: Some("1.4.0".into()),
264                cpu: body
265                    .get("overrides")
266                    .and_then(|v| v.get("cpu"))
267                    .and_then(|v| v.as_str())
268                    .map(String::from)
269                    .or_else(|| td_cpu.clone()),
270                memory: body
271                    .get("overrides")
272                    .and_then(|v| v.get("memory"))
273                    .and_then(|v| v.as_str())
274                    .map(String::from)
275                    .or_else(|| td_memory.clone()),
276                containers,
277                overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
278                started_by: started_by.clone(),
279                group: group.clone(),
280                connectivity: "CONNECTING".into(),
281                stop_code: None,
282                stopped_reason: None,
283                created_at: Utc::now(),
284                started_at: None,
285                stopping_at: None,
286                stopped_at: None,
287                pull_started_at: None,
288                pull_stopped_at: None,
289                connectivity_at: None,
290                started_by_ref_id: None,
291                execution_role_arn: td_exec_role.clone(),
292                task_role_arn: td_task_role.clone(),
293                tags: tags.clone(),
294                awslogs,
295                captured_logs: String::new(),
296                protection: None,
297                enable_execute_command,
298                attachments: Vec::new(),
299                volume_configurations: volume_configurations.clone(),
300                task_set_arn: None,
301            };
302            // Best-effort placement for EC2 / EXTERNAL launch types.
303            if launch_type != "FARGATE" {
304                if let Some(arn) = crate::placement::select_container_instance(
305                    state,
306                    &cluster_name,
307                    &placement_constraints,
308                    &placement_strategy,
309                    task.group.as_deref(),
310                    &td_arn,
311                    &launch_type,
312                ) {
313                    task.container_instance_arn = Some(arn.clone());
314                    if let Some(ci) = state
315                        .container_instances
316                        .values_mut()
317                        .find(|ci| ci.container_instance_arn == arn)
318                    {
319                        ci.pending_tasks_count += 1;
320                    }
321                }
322            }
323            state.tasks.insert(task_id.clone(), task.clone());
324            if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
325                cluster.pending_tasks_count += 1;
326            }
327            // Snapshot-in-progress: transition to PENDING synchronously so
328            // callers that immediately DescribeTasks see movement. RUNNING /
329            // STOPPED come later from the background runtime task.
330            if let Some(t) = state.tasks.get_mut(&task_id) {
331                t.last_status = "PENDING".into();
332            }
333            task_jsons.push(task_to_json(&task));
334            spawned_tasks.push(task_id.clone());
335        }
336        drop(accounts);
337
338        // Launch container execution outside the state lock.
339        if let Some(rt) = runtime {
340            for id in &spawned_tasks {
341                rt.clone()
342                    .run_task(self.state.clone(), id.clone(), account.clone());
343            }
344        } else {
345            // No runtime available — fail fast so the task doesn't stay
346            // PENDING forever. We incremented pending_tasks_count above;
347            // decrement it here so the cluster counter doesn't drift and
348            // block later DeleteCluster calls.
349            let mut accounts = self.state.write();
350            if let Some(state) = accounts.get_mut(&account) {
351                let mut cluster_drains: Vec<String> = Vec::new();
352                for id in &spawned_tasks {
353                    if let Some(t) = state.tasks.get_mut(id) {
354                        t.last_status = "STOPPED".into();
355                        // desired_status stays RUNNING: the task was requested
356                        // to run but failed to start. AWS keeps desired_status
357                        // as RUNNING for failed standalone RunTask until the
358                        // caller explicitly stops it. This also ensures
359                        // list_tasks (default filter=RUNNING) finds it.
360                        t.stop_code = Some("TaskFailedToStart".into());
361                        t.stopped_reason = Some(
362                            "No container runtime available (docker/podman not installed)".into(),
363                        );
364                        t.stopped_at = Some(Utc::now());
365                        for c in t.containers.iter_mut() {
366                            c.last_status = "STOPPED".into();
367                        }
368                        cluster_drains.push(t.cluster_name.clone());
369                    }
370                }
371                for name in cluster_drains {
372                    if let Some(cluster) = state.clusters.get_mut(&name) {
373                        if cluster.pending_tasks_count > 0 {
374                            cluster.pending_tasks_count -= 1;
375                        }
376                    }
377                }
378            }
379        }
380
381        Ok(AwsResponse::ok_json(json!({
382            "tasks": task_jsons,
383            "failures": [],
384        })))
385    }
386
387    pub(super) fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
388        // StartTask targets explicit container instances. Our ECS emulator
389        // has no concept of registered container instances yet (Batch 4);
390        // fall through to the same semantics as RunTask so the API is
391        // usable while the container-instance surface is pending.
392        self.run_task(request)
393    }
394
395    pub(super) async fn stop_task(
396        &self,
397        request: &AwsRequest,
398    ) -> Result<AwsResponse, AwsServiceError> {
399        let body = request.json_body();
400        let task_ref = req_str(&body, "task")?;
401        let reason = opt_str(&body, "reason")
402            .unwrap_or("UserInitiated")
403            .to_string();
404        let cluster_ref = opt_str(&body, "cluster");
405        let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
406
407        let (task_id, account, task_snapshot) = {
408            let account = request.account_id.clone();
409            let mut accounts = self.state.write();
410            let state = accounts
411                .get_mut(&account)
412                .ok_or_else(|| task_not_found(task_ref))?;
413            let task_id = resolve_task_id(state, task_ref)?;
414            let task = state
415                .tasks
416                .get_mut(&task_id)
417                .ok_or_else(|| task_not_found(task_ref))?;
418            task.desired_status = "STOPPED".into();
419            task.stopping_at = Some(Utc::now());
420            task.stopped_reason = Some(reason.clone());
421            task.stop_code = Some("UserInitiated".into());
422            (task_id, account, task.clone())
423        };
424        if let Some(rt) = &self.runtime {
425            rt.stop_task(&task_id, &reason).await;
426        }
427        let _ = account;
428        Ok(AwsResponse::ok_json(json!({
429            "task": task_to_json(&task_snapshot),
430        })))
431    }
432
433    pub(super) fn describe_tasks(
434        &self,
435        request: &AwsRequest,
436    ) -> Result<AwsResponse, AwsServiceError> {
437        let body = request.json_body();
438        let refs: Vec<String> = req_array(&body, "tasks")?
439            .iter()
440            .filter_map(|v| v.as_str().map(String::from))
441            .collect();
442        let include_tags = body
443            .get("include")
444            .and_then(|v| v.as_array())
445            .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
446            .unwrap_or(false);
447
448        let account = request.account_id.clone();
449        let accounts = self.state.read();
450        let Some(state) = accounts.get(&account) else {
451            return Ok(AwsResponse::ok_json(json!({
452                "tasks": [],
453                "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
454            })));
455        };
456        let mut found = Vec::new();
457        let mut failures = Vec::new();
458        for input in &refs {
459            let task_id = task_id_from_ref(input);
460            match state.tasks.get(&task_id) {
461                Some(t) => {
462                    let mut v = task_to_json(t);
463                    if include_tags {
464                        v.as_object_mut()
465                            .unwrap()
466                            .insert("tags".into(), tags_json(&t.tags));
467                    }
468                    found.push(v);
469                }
470                None => {
471                    failures.push(json!({
472                        "arn": input,
473                        "reason": "MISSING",
474                    }));
475                }
476            }
477        }
478        Ok(AwsResponse::ok_json(json!({
479            "tasks": found,
480            "failures": failures,
481        })))
482    }
483
484    pub(super) fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
485        let body = request.json_body();
486        validate_enum_opt(&body, "desiredStatus", &["RUNNING", "PENDING", "STOPPED"])?;
487        validate_enum_opt(
488            &body,
489            "launchType",
490            &["EC2", "FARGATE", "EXTERNAL", "MANAGED_INSTANCES"],
491        )?;
492        let cluster_ref = opt_str(&body, "cluster");
493        let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
494        let family = opt_str(&body, "family");
495        let status_filter = opt_str(&body, "desiredStatus").or(Some("RUNNING"));
496        let started_by = opt_str(&body, "startedBy");
497        let max_results = body
498            .get("maxResults")
499            .and_then(|v| v.as_i64())
500            .filter(|n| (1..=100).contains(n))
501            .map(|n| n as usize)
502            .unwrap_or(100);
503        let next_token = opt_str(&body, "nextToken").unwrap_or("");
504
505        let account = request.account_id.clone();
506        let accounts = self.state.read();
507        let mut arns: Vec<String> = match accounts.get(&account) {
508            Some(state) => state
509                .tasks
510                .values()
511                .filter(|t| t.cluster_name == cluster_name)
512                .filter(|t| family.is_none_or(|f| t.family == f))
513                .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
514                .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
515                .map(|t| t.task_arn.clone())
516                .collect(),
517            None => Vec::new(),
518        };
519        arns.sort();
520        let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
521        let end = (start + max_results).min(arns.len());
522        let page = arns[start..end].to_vec();
523        let mut out = json!({"taskArns": page});
524        if end < arns.len() {
525            out.as_object_mut()
526                .unwrap()
527                .insert("nextToken".into(), json!(end.to_string()));
528        }
529        Ok(AwsResponse::ok_json(out))
530    }
531}
532
533#[cfg(test)]
534mod multi_container_tests {
535    use super::*;
536    use crate::EcsService;
537    use bytes::Bytes;
538    use fakecloud_core::multi_account::MultiAccountState;
539    use http::{HeaderMap, Method};
540    use parking_lot::RwLock;
541    use std::collections::HashMap;
542    use std::sync::Arc;
543
544    fn fresh_service() -> EcsService {
545        let accounts: MultiAccountState<EcsState> =
546            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
547        let state = Arc::new(RwLock::new(accounts));
548        let svc = EcsService::new(state.clone());
549        // Pre-create the cluster so RunTask doesn't trip on a missing one.
550        let mut accounts = state.write();
551        let s = accounts.get_or_create("000000000000");
552        let arn = s.cluster_arn("default");
553        s.clusters
554            .insert("default".into(), Cluster::new("default", arn));
555        drop(accounts);
556        svc
557    }
558
559    fn make_request(action: &str, body: Value) -> AwsRequest {
560        let body_bytes = Bytes::from(serde_json::to_vec(&body).unwrap());
561        AwsRequest {
562            service: "ecs".into(),
563            action: action.into(),
564            region: "us-east-1".into(),
565            account_id: "000000000000".into(),
566            request_id: uuid::Uuid::new_v4().to_string(),
567            headers: HeaderMap::new(),
568            query_params: HashMap::new(),
569            body: body_bytes,
570            body_stream: parking_lot::Mutex::new(None),
571            path_segments: Vec::new(),
572            raw_path: "/".into(),
573            raw_query: String::new(),
574            method: Method::POST,
575            is_query_protocol: false,
576            access_key_id: None,
577            principal: None,
578        }
579    }
580
581    #[test]
582    fn register_task_def_with_two_containers_then_run_task_starts_both() {
583        let svc = fresh_service();
584        let reg = make_request(
585            "RegisterTaskDefinition",
586            json!({
587                "family": "multi",
588                "containerDefinitions": [
589                    {"name": "app", "image": "alpine"},
590                    {"name": "sidecar", "image": "alpine"}
591                ]
592            }),
593        );
594        svc.register_task_definition(&reg)
595            .expect("register should succeed");
596
597        let run = make_request(
598            "RunTask",
599            json!({
600                "cluster": "default",
601                "taskDefinition": "multi",
602            }),
603        );
604        let resp = svc.run_task(&run).expect("run_task should succeed");
605        let body: Value =
606            serde_json::from_slice(resp.body.expect_bytes()).expect("body should be valid JSON");
607        let tasks = body
608            .get("tasks")
609            .and_then(|v| v.as_array())
610            .expect("tasks array");
611        assert_eq!(tasks.len(), 1);
612        let task = &tasks[0];
613        let containers = task
614            .get("containers")
615            .and_then(|v| v.as_array())
616            .expect("containers array on task");
617        assert_eq!(containers.len(), 2, "expected both containers in task");
618        let names: Vec<&str> = containers
619            .iter()
620            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
621            .collect();
622        assert!(names.contains(&"app"));
623        assert!(names.contains(&"sidecar"));
624
625        // Per-container ARNs must be distinct so DescribeTasks can address
626        // each container independently.
627        let arns: std::collections::HashSet<&str> = containers
628            .iter()
629            .filter_map(|c| c.get("containerArn").and_then(|v| v.as_str()))
630            .collect();
631        assert_eq!(arns.len(), 2);
632    }
633
634    #[test]
635    fn register_task_def_defaults_essential_true() {
636        let svc = fresh_service();
637        let reg = make_request(
638            "RegisterTaskDefinition",
639            json!({
640                "family": "default-essential",
641                // No `essential` declared on either container.
642                "containerDefinitions": [
643                    {"name": "main", "image": "alpine"},
644                    {"name": "extra", "image": "alpine"}
645                ]
646            }),
647        );
648        svc.register_task_definition(&reg).unwrap();
649
650        let run = make_request(
651            "RunTask",
652            json!({
653                "cluster": "default",
654                "taskDefinition": "default-essential",
655            }),
656        );
657        let resp = svc.run_task(&run).unwrap();
658        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
659        let containers = body["tasks"][0]["containers"].as_array().unwrap();
660        // The `essential` flag is a TaskDefinition.ContainerDefinition
661        // field, not part of the runtime `Container` response shape.
662        // Real ECS doesn't echo it back on RunTask, and the conformance
663        // probe rejects responses that do.
664        for c in containers {
665            assert!(
666                c.get("essential").is_none(),
667                "container {:?} must not carry `essential` on the runtime shape",
668                c.get("name")
669            );
670        }
671    }
672
673    #[test]
674    fn task_to_json_emits_full_container_array() {
675        // Build a Task with two containers directly and confirm helper
676        // emits both entries in the response shape.
677        let mut task = Task {
678            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
679            task_id: "abc".into(),
680            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
681            cluster_name: "default".into(),
682            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/multi:1"
683                .into(),
684            family: "multi".into(),
685            revision: 1,
686            container_instance_arn: None,
687            capacity_provider_name: None,
688            last_status: "RUNNING".into(),
689            desired_status: "RUNNING".into(),
690            launch_type: "FARGATE".into(),
691            platform_version: None,
692            cpu: None,
693            memory: None,
694            containers: Vec::new(),
695            overrides: json!({}),
696            started_by: None,
697            group: None,
698            connectivity: "CONNECTED".into(),
699            stop_code: None,
700            stopped_reason: None,
701            created_at: chrono::Utc::now(),
702            started_at: None,
703            stopping_at: None,
704            stopped_at: None,
705            pull_started_at: None,
706            pull_stopped_at: None,
707            connectivity_at: None,
708            started_by_ref_id: None,
709            execution_role_arn: None,
710            task_role_arn: None,
711            tags: Vec::new(),
712            awslogs: None,
713            captured_logs: String::new(),
714            protection: None,
715            enable_execute_command: false,
716            attachments: Vec::new(),
717            volume_configurations: Vec::new(),
718            task_set_arn: None,
719        };
720        for name in ["app", "sidecar"] {
721            task.containers.push(Container {
722                container_arn: format!(
723                    "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
724                ),
725                name: name.into(),
726                image: "alpine".into(),
727                task_arn: task.task_arn.clone(),
728                last_status: "RUNNING".into(),
729                exit_code: None,
730                reason: None,
731                runtime_id: Some(format!("docker-{name}")),
732                essential: true,
733                cpu: None,
734                memory: None,
735                memory_reservation: None,
736                network_bindings: Vec::new(),
737                network_interfaces: Vec::new(),
738                health_status: None,
739                managed_agents: None,
740                image_digest: None,
741            });
742        }
743
744        let v = task_to_json(&task);
745        let containers = v
746            .get("containers")
747            .and_then(|v| v.as_array())
748            .expect("containers array");
749        assert_eq!(containers.len(), 2);
750        let names: Vec<&str> = containers
751            .iter()
752            .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
753            .collect();
754        assert_eq!(names, vec!["app", "sidecar"]);
755        for c in containers {
756            assert!(c.get("containerArn").is_some());
757            assert!(c.get("name").is_some());
758            assert!(c.get("lastStatus").is_some());
759            assert!(c.get("runtimeId").is_some());
760            assert!(c.get("essential").is_none());
761        }
762    }
763}
764
765#[cfg(test)]
766mod port_mapping_tests {
767    //! Cover the `portMappings` -> `docker run --publish` translation
768    //! plus the per-container `networkBindings` projected onto the task.
769    //! The argv builder is exercised directly so we don't need a real
770    //! container CLI on the test host.
771    use super::*;
772    use crate::runtime::{
773        build_run_argv, mark_running_multi, ContainerPlan, PortMapping, RunningContainer,
774    };
775    use crate::state::{Container, EcsState};
776    use crate::SharedEcsState;
777    use chrono::Utc;
778    use fakecloud_core::multi_account::MultiAccountState;
779    use parking_lot::RwLock;
780    use std::sync::Arc;
781
782    fn plan_with_ports(
783        port_mappings: Vec<PortMapping>,
784        network_mode: Option<&str>,
785    ) -> ContainerPlan {
786        ContainerPlan {
787            container_name: "app".into(),
788            image: "alpine:latest".into(),
789            env: Vec::new(),
790            entry_point: Vec::new(),
791            command: Vec::new(),
792            secrets_refs: Vec::new(),
793            essential: true,
794            has_task_role: false,
795            port_mappings,
796            network_mode: network_mode.map(String::from),
797            depends_on: Vec::new(),
798            health_check: None,
799            volume_mounts: Vec::new(),
800            ulimits: Vec::new(),
801            linux_parameters: None,
802            stop_timeout: None,
803            user: None,
804            working_directory: None,
805            tty: false,
806            interactive: false,
807            readonly_rootfs: false,
808        }
809    }
810
811    fn argv_string(plan: &ContainerPlan) -> Vec<String> {
812        build_run_argv(
813            plan,
814            &[],
815            "task-1",
816            "host.docker.internal",
817            None,
818            "alpine:latest",
819            true,
820        )
821    }
822
823    /// Helper for asserting a `--publish <spec>` pair is present in argv.
824    /// Returns true when the flag/value pair appears as adjacent entries.
825    fn argv_has_publish(argv: &[String], spec: &str) -> bool {
826        argv.windows(2).any(|w| w[0] == "--publish" && w[1] == spec)
827    }
828
829    #[test]
830    fn port_mappings_translate_to_publish_flags() {
831        let plan = plan_with_ports(
832            vec![PortMapping {
833                container_port: 80,
834                host_port: 8080,
835                protocol: "tcp".into(),
836            }],
837            None,
838        );
839        let argv = argv_string(&plan);
840        assert!(
841            argv_has_publish(&argv, "80:8080/tcp"),
842            "expected --publish 80:8080/tcp in argv: {argv:?}"
843        );
844    }
845
846    #[test]
847    fn port_mappings_default_host_port_to_container_port() {
848        // host_port=0 in the parsed mapping means "AWS host-mode default";
849        // parse_port_mapping rewrites that to containerPort, so by the time
850        // we reach build_run_argv the host_port should already equal 80.
851        // Drive the same path through the JSON parser to lock in the
852        // default behaviour end to end.
853        let parsed =
854            crate::runtime::__test_parse_port_mapping(&serde_json::json!({"containerPort": 80}))
855                .expect("containerPort should parse");
856        assert_eq!(
857            parsed.host_port, 80,
858            "default hostPort should mirror containerPort"
859        );
860        let argv = argv_string(&plan_with_ports(vec![parsed], None));
861        assert!(
862            argv_has_publish(&argv, "80:80/tcp"),
863            "expected --publish 80:80/tcp when hostPort omitted: {argv:?}"
864        );
865    }
866
867    #[test]
868    fn port_mappings_default_protocol_tcp() {
869        let parsed = crate::runtime::__test_parse_port_mapping(
870            &serde_json::json!({"containerPort": 443, "hostPort": 443}),
871        )
872        .expect("containerPort should parse");
873        assert_eq!(parsed.protocol, "tcp");
874        let argv = argv_string(&plan_with_ports(vec![parsed], None));
875        assert!(
876            argv_has_publish(&argv, "443:443/tcp"),
877            "expected default protocol tcp: {argv:?}"
878        );
879    }
880
881    #[test]
882    fn awsvpc_network_mode_skips_publish() {
883        let plan = plan_with_ports(
884            vec![PortMapping {
885                container_port: 80,
886                host_port: 8080,
887                protocol: "tcp".into(),
888            }],
889            Some("awsvpc"),
890        );
891        let argv = argv_string(&plan);
892        assert!(
893            !argv.iter().any(|s| s == "--publish"),
894            "awsvpc must not emit --publish: {argv:?}"
895        );
896    }
897
898    #[test]
899    fn awsvpc_network_mode_includes_network_flag() {
900        let plan = plan_with_ports(Vec::new(), Some("awsvpc"));
901        let argv = argv_string(&plan);
902        let network_idx = argv.iter().position(|s| s == "--network");
903        assert!(
904            network_idx.is_some(),
905            "awsvpc must emit --network: {argv:?}"
906        );
907        let network_name = argv.get(network_idx.unwrap() + 1);
908        assert!(
909            network_name
910                .map(|n| n.starts_with("fakecloud-ecs-"))
911                .unwrap_or(false),
912            "awsvpc must reference fakecloud-ecs network: {argv:?}"
913        );
914    }
915
916    #[test]
917    fn network_bindings_populated_on_task() {
918        // Build a task in state, run mark_running_multi with a started
919        // container that has network_bindings populated, and verify
920        // task_to_json emits them under containers[0].networkBindings.
921        let mut accounts: MultiAccountState<EcsState> =
922            MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
923        let acct = accounts.get_or_create("000000000000");
924        let arn = acct.cluster_arn("default");
925        acct.clusters
926            .insert("default".into(), Cluster::new("default", arn));
927        let mut task = Task {
928            task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
929            task_id: "abc".into(),
930            cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
931            cluster_name: "default".into(),
932            task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/web:1".into(),
933            family: "web".into(),
934            revision: 1,
935            container_instance_arn: None,
936            capacity_provider_name: None,
937            last_status: "PENDING".into(),
938            desired_status: "RUNNING".into(),
939            launch_type: "FARGATE".into(),
940            platform_version: None,
941            cpu: None,
942            memory: None,
943            containers: vec![Container {
944                container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/web"
945                    .into(),
946                name: "web".into(),
947                image: "alpine".into(),
948                task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
949                last_status: "PENDING".into(),
950                exit_code: None,
951                reason: None,
952                runtime_id: None,
953                essential: true,
954                cpu: None,
955                memory: None,
956                memory_reservation: None,
957                network_bindings: Vec::new(),
958                network_interfaces: Vec::new(),
959                health_status: None,
960                managed_agents: None,
961                image_digest: None,
962            }],
963            overrides: serde_json::json!({}),
964            started_by: None,
965            group: None,
966            connectivity: "CONNECTING".into(),
967            stop_code: None,
968            stopped_reason: None,
969            created_at: Utc::now(),
970            started_at: None,
971            stopping_at: None,
972            stopped_at: None,
973            pull_started_at: None,
974            pull_stopped_at: None,
975            connectivity_at: None,
976            started_by_ref_id: None,
977            execution_role_arn: None,
978            task_role_arn: None,
979            tags: Vec::new(),
980            awslogs: None,
981            captured_logs: String::new(),
982            protection: None,
983            enable_execute_command: false,
984            attachments: Vec::new(),
985            volume_configurations: Vec::new(),
986            task_set_arn: None,
987        };
988        task.last_status = "PENDING".into();
989        acct.tasks.insert("abc".into(), task);
990        let state: SharedEcsState = Arc::new(RwLock::new(accounts));
991
992        let bindings = vec![serde_json::json!({
993            "bindIP": "0.0.0.0",
994            "containerPort": 80,
995            "hostPort": 8080,
996            "protocol": "tcp",
997        })];
998        let started = vec![RunningContainer {
999            name: "web".into(),
1000            container_id: "docker-id".into(),
1001            essential: true,
1002            exit_code: None,
1003            network_bindings: bindings.clone(),
1004            image_digest: None,
1005        }];
1006        mark_running_multi(&state, "000000000000", "abc", &started);
1007
1008        let accounts = state.read();
1009        let task = accounts
1010            .get("000000000000")
1011            .unwrap()
1012            .tasks
1013            .get("abc")
1014            .unwrap();
1015        let json = task_to_json(task);
1016        let nb = &json["containers"][0]["networkBindings"];
1017        assert_eq!(nb, &serde_json::Value::Array(bindings));
1018    }
1019}