1#![allow(clippy::too_many_arguments)]
4
5use chrono::Utc;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsServiceError};
9
10use super::*;
11
12impl EcsService {
13 pub fn run_task_external(
20 &self,
21 account_id: &str,
22 cluster: &str,
23 task_definition: &str,
24 launch_type: Option<&str>,
25 count: usize,
26 ) -> Result<(), String> {
27 use bytes::Bytes;
28 use http::{HeaderMap, Method};
29 use std::collections::HashMap;
30 let body = serde_json::json!({
31 "cluster": cluster,
32 "taskDefinition": task_definition,
33 "launchType": launch_type.unwrap_or("FARGATE"),
34 "count": count.max(1) as i64,
35 });
36 let body_bytes =
37 Bytes::from(serde_json::to_vec(&body).map_err(|e| format!("encode body: {e}"))?);
38 let req = AwsRequest {
39 service: "ecs".into(),
40 action: "RunTask".into(),
41 region: "us-east-1".into(),
42 account_id: account_id.to_string(),
43 request_id: uuid::Uuid::new_v4().to_string(),
44 headers: HeaderMap::new(),
45 query_params: HashMap::new(),
46 body: body_bytes,
47 body_stream: parking_lot::Mutex::new(None),
48 path_segments: Vec::new(),
49 raw_path: "/".into(),
50 raw_query: String::new(),
51 method: Method::POST,
52 is_query_protocol: false,
53 access_key_id: None,
54 principal: None,
55 };
56 self.run_task(&req)
57 .map(|_| ())
58 .map_err(|e| format!("{e:?}"))
59 }
60
61 pub fn run_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
62 let body = request.json_body();
63 let td_ref = req_str(&body, "taskDefinition")?;
64 let cluster_ref = opt_str(&body, "cluster");
65 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
66 let launch_type = opt_str(&body, "launchType")
67 .unwrap_or("FARGATE")
68 .to_string();
69 let placement_constraints: Vec<Value> = body
70 .get("placementConstraints")
71 .and_then(|v| v.as_array())
72 .cloned()
73 .unwrap_or_default();
74 let placement_strategy: Vec<Value> = body
75 .get("placementStrategy")
76 .and_then(|v| v.as_array())
77 .cloned()
78 .unwrap_or_default();
79 let count = body
80 .get("count")
81 .and_then(|v| v.as_i64())
82 .filter(|n| (1..=10).contains(n))
83 .unwrap_or(1) as usize;
84 let group = opt_str(&body, "group").map(String::from);
85 let started_by = opt_str(&body, "startedBy").map(String::from);
86 let enable_execute_command = body
87 .get("enableExecuteCommand")
88 .and_then(|v| v.as_bool())
89 .unwrap_or(false);
90 let propagate_tags = opt_str(&body, "propagateTags").map(String::from);
91 let _enable_ecs_managed_tags = body
92 .get("enableECSManagedTags")
93 .and_then(|v| v.as_bool())
94 .unwrap_or(false);
95 let _capacity_provider_strategy: Vec<Value> = body
96 .get("capacityProviderStrategy")
97 .and_then(|v| v.as_array())
98 .cloned()
99 .unwrap_or_default();
100 let volume_configurations: Vec<Value> = body
101 .get("volumeConfigurations")
102 .and_then(|v| v.as_array())
103 .cloned()
104 .unwrap_or_default();
105 let _availability_zone_rebalancing =
106 opt_str(&body, "availabilityZoneRebalancing").map(String::from);
107 let mut tags = parse_tags(&body);
108
109 if let Some(overrides) = body.get("overrides") {
115 if let Some(role_arn) = opt_str(overrides, "taskRoleArn") {
116 self.check_pass_role(&request.account_id, role_arn)?;
117 }
118 if let Some(role_arn) = opt_str(overrides, "executionRoleArn") {
119 self.check_pass_role(&request.account_id, role_arn)?;
120 }
121 }
122
123 let account = request.account_id.clone();
124 let runtime = self.runtime.clone();
125 let mut accounts = self.state.write();
126 let state = accounts.get_or_create(&account);
127 let cluster_arn = state
128 .clusters
129 .get(&cluster_name)
130 .map(|c| c.cluster_arn.clone())
131 .unwrap_or_else(|| state.cluster_arn(&cluster_name));
132 let (_, family, rev) = resolve_task_definition_ref(td_ref)?;
133 let revisions = state
134 .task_definitions
135 .get(&family)
136 .ok_or_else(|| task_definition_not_found(td_ref))?;
137 let td = match rev {
138 Some(n) => revisions
139 .get(&n)
140 .ok_or_else(|| task_definition_not_found(td_ref))?,
141 None => latest_active_revision(revisions)
142 .ok_or_else(|| task_definition_not_found(td_ref))?,
143 };
144 if td.status != "ACTIVE" {
145 return Err(client_exception(format!(
146 "Task definition {} is not ACTIVE",
147 td.task_definition_arn
148 )));
149 }
150 let td_arn = td.task_definition_arn.clone();
151 let td_family = td.family.clone();
152 let td_revision = td.revision;
153 let td_cpu = td.cpu.clone();
154 let td_memory = td.memory.clone();
155 let td_task_role = td.task_role_arn.clone();
156 let td_exec_role = td.execution_role_arn.clone();
157 let td_containers = td.container_definitions.clone();
158 if propagate_tags.as_deref() == Some("TASK_DEFINITION") {
163 let mut td_tags = td.tags.clone();
164 td_tags.retain(|t| !tags.iter().any(|x| x.key == t.key));
165 tags.extend(td_tags);
166 }
167
168 let mut spawned_tasks: Vec<String> = Vec::new();
169 let mut task_jsons: Vec<Value> = Vec::new();
170 for _ in 0..count {
171 let task_id = uuid::Uuid::new_v4().to_string().replace('-', "");
172 let task_arn = state.task_arn(&cluster_name, &task_id);
173 let containers: Vec<Container> = td_containers
174 .iter()
175 .map(|def| Container {
176 container_arn: format!(
177 "arn:aws:ecs:{}:{}:container/{}/{}/{}",
178 state.region,
179 state.account_id,
180 cluster_name,
181 task_id,
182 def.get("name").and_then(|v| v.as_str()).unwrap_or("app")
183 ),
184 name: def
185 .get("name")
186 .and_then(|v| v.as_str())
187 .unwrap_or("app")
188 .to_string(),
189 image: def
190 .get("image")
191 .and_then(|v| v.as_str())
192 .unwrap_or("")
193 .to_string(),
194 task_arn: task_arn.clone(),
195 last_status: "PENDING".into(),
196 exit_code: None,
197 reason: None,
198 runtime_id: None,
199 essential: def
200 .get("essential")
201 .and_then(|v| v.as_bool())
202 .unwrap_or(true),
203 cpu: def
204 .get("cpu")
205 .and_then(|v| v.as_i64())
206 .map(|n| n.to_string()),
207 memory: def
208 .get("memory")
209 .and_then(|v| v.as_i64())
210 .map(|n| n.to_string()),
211 memory_reservation: def
212 .get("memoryReservation")
213 .and_then(|v| v.as_i64())
214 .map(|n| n.to_string()),
215 network_bindings: Vec::new(),
216 network_interfaces: Vec::new(),
217 health_status: Some("UNKNOWN".to_string()),
218 managed_agents: None,
219 image_digest: None,
220 })
221 .collect();
222 let awslogs = td_containers.iter().find_map(|def| {
223 let name = def.get("name").and_then(|v| v.as_str())?.to_string();
224 let log_cfg = def.get("logConfiguration")?;
225 if log_cfg.get("logDriver").and_then(|v| v.as_str()) != Some("awslogs") {
226 return None;
227 }
228 let opts = log_cfg.get("options").and_then(|v| v.as_object())?;
229 Some(AwsLogsConfig {
230 group: opts.get("awslogs-group").and_then(|v| v.as_str())?.into(),
231 stream_prefix: opts
232 .get("awslogs-stream-prefix")
233 .and_then(|v| v.as_str())
234 .map(String::from),
235 region: opts
236 .get("awslogs-region")
237 .and_then(|v| v.as_str())
238 .unwrap_or(&state.region)
239 .to_string(),
240 container_name: name,
241 })
242 });
243 let capacity_provider_name = body
244 .get("capacityProviderStrategy")
245 .and_then(|v| v.as_array())
246 .and_then(|arr| arr.first())
247 .and_then(|item| item.get("capacityProvider"))
248 .and_then(|v| v.as_str())
249 .map(String::from);
250 let mut task = Task {
251 task_arn: task_arn.clone(),
252 task_id: task_id.clone(),
253 cluster_arn: cluster_arn.clone(),
254 cluster_name: cluster_name.clone(),
255 task_definition_arn: td_arn.clone(),
256 family: td_family.clone(),
257 revision: td_revision,
258 container_instance_arn: None,
259 capacity_provider_name,
260 last_status: "PROVISIONING".into(),
261 desired_status: "RUNNING".into(),
262 launch_type: launch_type.clone(),
263 platform_version: Some("1.4.0".into()),
264 cpu: body
265 .get("overrides")
266 .and_then(|v| v.get("cpu"))
267 .and_then(|v| v.as_str())
268 .map(String::from)
269 .or_else(|| td_cpu.clone()),
270 memory: body
271 .get("overrides")
272 .and_then(|v| v.get("memory"))
273 .and_then(|v| v.as_str())
274 .map(String::from)
275 .or_else(|| td_memory.clone()),
276 containers,
277 overrides: body.get("overrides").cloned().unwrap_or_else(|| json!({})),
278 started_by: started_by.clone(),
279 group: group.clone(),
280 connectivity: "CONNECTING".into(),
281 stop_code: None,
282 stopped_reason: None,
283 created_at: Utc::now(),
284 started_at: None,
285 stopping_at: None,
286 stopped_at: None,
287 pull_started_at: None,
288 pull_stopped_at: None,
289 connectivity_at: None,
290 started_by_ref_id: None,
291 execution_role_arn: td_exec_role.clone(),
292 task_role_arn: td_task_role.clone(),
293 tags: tags.clone(),
294 awslogs,
295 captured_logs: String::new(),
296 protection: None,
297 enable_execute_command,
298 attachments: Vec::new(),
299 volume_configurations: volume_configurations.clone(),
300 task_set_arn: None,
301 };
302 if launch_type != "FARGATE" {
304 if let Some(arn) = crate::placement::select_container_instance(
305 state,
306 &cluster_name,
307 &placement_constraints,
308 &placement_strategy,
309 task.group.as_deref(),
310 &td_arn,
311 &launch_type,
312 ) {
313 task.container_instance_arn = Some(arn.clone());
314 if let Some(ci) = state
315 .container_instances
316 .values_mut()
317 .find(|ci| ci.container_instance_arn == arn)
318 {
319 ci.pending_tasks_count += 1;
320 }
321 }
322 }
323 state.tasks.insert(task_id.clone(), task.clone());
324 if let Some(cluster) = state.clusters.get_mut(&cluster_name) {
325 cluster.pending_tasks_count += 1;
326 }
327 if let Some(t) = state.tasks.get_mut(&task_id) {
331 t.last_status = "PENDING".into();
332 }
333 task_jsons.push(task_to_json(&task));
334 spawned_tasks.push(task_id.clone());
335 }
336 drop(accounts);
337
338 if let Some(rt) = runtime {
340 for id in &spawned_tasks {
341 rt.clone()
342 .run_task(self.state.clone(), id.clone(), account.clone());
343 }
344 } else {
345 let mut accounts = self.state.write();
350 if let Some(state) = accounts.get_mut(&account) {
351 let mut cluster_drains: Vec<String> = Vec::new();
352 for id in &spawned_tasks {
353 if let Some(t) = state.tasks.get_mut(id) {
354 t.last_status = "STOPPED".into();
355 t.stop_code = Some("TaskFailedToStart".into());
361 t.stopped_reason = Some(
362 "No container runtime available (docker/podman not installed)".into(),
363 );
364 t.stopped_at = Some(Utc::now());
365 for c in t.containers.iter_mut() {
366 c.last_status = "STOPPED".into();
367 }
368 cluster_drains.push(t.cluster_name.clone());
369 }
370 }
371 for name in cluster_drains {
372 if let Some(cluster) = state.clusters.get_mut(&name) {
373 if cluster.pending_tasks_count > 0 {
374 cluster.pending_tasks_count -= 1;
375 }
376 }
377 }
378 }
379 }
380
381 Ok(AwsResponse::ok_json(json!({
382 "tasks": task_jsons,
383 "failures": [],
384 })))
385 }
386
387 pub(super) fn start_task(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
388 self.run_task(request)
393 }
394
395 pub(super) async fn stop_task(
396 &self,
397 request: &AwsRequest,
398 ) -> Result<AwsResponse, AwsServiceError> {
399 let body = request.json_body();
400 let task_ref = req_str(&body, "task")?;
401 let reason = opt_str(&body, "reason")
402 .unwrap_or("UserInitiated")
403 .to_string();
404 let cluster_ref = opt_str(&body, "cluster");
405 let _cluster_name = EcsState::resolve_cluster_name(cluster_ref);
406
407 let (task_id, account, task_snapshot) = {
408 let account = request.account_id.clone();
409 let mut accounts = self.state.write();
410 let state = accounts
411 .get_mut(&account)
412 .ok_or_else(|| task_not_found(task_ref))?;
413 let task_id = resolve_task_id(state, task_ref)?;
414 let task = state
415 .tasks
416 .get_mut(&task_id)
417 .ok_or_else(|| task_not_found(task_ref))?;
418 task.desired_status = "STOPPED".into();
419 task.stopping_at = Some(Utc::now());
420 task.stopped_reason = Some(reason.clone());
421 task.stop_code = Some("UserInitiated".into());
422 (task_id, account, task.clone())
423 };
424 if let Some(rt) = &self.runtime {
425 rt.stop_task(&task_id, &reason).await;
426 }
427 let _ = account;
428 Ok(AwsResponse::ok_json(json!({
429 "task": task_to_json(&task_snapshot),
430 })))
431 }
432
433 pub(super) fn describe_tasks(
434 &self,
435 request: &AwsRequest,
436 ) -> Result<AwsResponse, AwsServiceError> {
437 let body = request.json_body();
438 let refs: Vec<String> = body
439 .get("tasks")
440 .and_then(|v| v.as_array())
441 .map(|arr| {
442 arr.iter()
443 .filter_map(|v| v.as_str().map(String::from))
444 .collect()
445 })
446 .unwrap_or_default();
447 let include_tags = body
448 .get("include")
449 .and_then(|v| v.as_array())
450 .map(|arr| arr.iter().any(|v| v.as_str() == Some("TAGS")))
451 .unwrap_or(false);
452
453 let account = request.account_id.clone();
454 let accounts = self.state.read();
455 let Some(state) = accounts.get(&account) else {
456 return Ok(AwsResponse::ok_json(json!({
457 "tasks": [],
458 "failures": refs.iter().map(|r| json!({"arn": r, "reason": "MISSING"})).collect::<Vec<_>>(),
459 })));
460 };
461 let mut found = Vec::new();
462 let mut failures = Vec::new();
463 for input in &refs {
464 let task_id = task_id_from_ref(input);
465 match state.tasks.get(&task_id) {
466 Some(t) => {
467 let mut v = task_to_json(t);
468 if include_tags {
469 v.as_object_mut()
470 .unwrap()
471 .insert("tags".into(), tags_json(&t.tags));
472 }
473 found.push(v);
474 }
475 None => {
476 failures.push(json!({
477 "arn": input,
478 "reason": "MISSING",
479 }));
480 }
481 }
482 }
483 Ok(AwsResponse::ok_json(json!({
484 "tasks": found,
485 "failures": failures,
486 })))
487 }
488
489 pub(super) fn list_tasks(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
490 let body = request.json_body();
491 let cluster_ref = opt_str(&body, "cluster");
492 let cluster_name = EcsState::resolve_cluster_name(cluster_ref);
493 let family = opt_str(&body, "family");
494 let status_filter = opt_str(&body, "desiredStatus").or(Some("RUNNING"));
495 let started_by = opt_str(&body, "startedBy");
496 let max_results = body
497 .get("maxResults")
498 .and_then(|v| v.as_i64())
499 .filter(|n| (1..=100).contains(n))
500 .map(|n| n as usize)
501 .unwrap_or(100);
502 let next_token = opt_str(&body, "nextToken").unwrap_or("");
503
504 let account = request.account_id.clone();
505 let accounts = self.state.read();
506 let mut arns: Vec<String> = match accounts.get(&account) {
507 Some(state) => state
508 .tasks
509 .values()
510 .filter(|t| t.cluster_name == cluster_name)
511 .filter(|t| family.is_none_or(|f| t.family == f))
512 .filter(|t| status_filter.is_none_or(|s| t.desired_status == s))
513 .filter(|t| started_by.is_none_or(|s| t.started_by.as_deref() == Some(s)))
514 .map(|t| t.task_arn.clone())
515 .collect(),
516 None => Vec::new(),
517 };
518 arns.sort();
519 let start = next_token.parse::<usize>().unwrap_or(0).min(arns.len());
520 let end = (start + max_results).min(arns.len());
521 let page = arns[start..end].to_vec();
522 let mut out = json!({"taskArns": page});
523 if end < arns.len() {
524 out.as_object_mut()
525 .unwrap()
526 .insert("nextToken".into(), json!(end.to_string()));
527 }
528 Ok(AwsResponse::ok_json(out))
529 }
530}
531
532#[cfg(test)]
533mod multi_container_tests {
534 use super::*;
535 use crate::EcsService;
536 use bytes::Bytes;
537 use fakecloud_core::multi_account::MultiAccountState;
538 use http::{HeaderMap, Method};
539 use parking_lot::RwLock;
540 use std::collections::HashMap;
541 use std::sync::Arc;
542
543 fn fresh_service() -> EcsService {
544 let accounts: MultiAccountState<EcsState> =
545 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
546 let state = Arc::new(RwLock::new(accounts));
547 let svc = EcsService::new(state.clone());
548 let mut accounts = state.write();
550 let s = accounts.get_or_create("000000000000");
551 let arn = s.cluster_arn("default");
552 s.clusters
553 .insert("default".into(), Cluster::new("default", arn));
554 drop(accounts);
555 svc
556 }
557
558 fn make_request(action: &str, body: Value) -> AwsRequest {
559 let body_bytes = Bytes::from(serde_json::to_vec(&body).unwrap());
560 AwsRequest {
561 service: "ecs".into(),
562 action: action.into(),
563 region: "us-east-1".into(),
564 account_id: "000000000000".into(),
565 request_id: uuid::Uuid::new_v4().to_string(),
566 headers: HeaderMap::new(),
567 query_params: HashMap::new(),
568 body: body_bytes,
569 body_stream: parking_lot::Mutex::new(None),
570 path_segments: Vec::new(),
571 raw_path: "/".into(),
572 raw_query: String::new(),
573 method: Method::POST,
574 is_query_protocol: false,
575 access_key_id: None,
576 principal: None,
577 }
578 }
579
580 #[test]
581 fn register_task_def_with_two_containers_then_run_task_starts_both() {
582 let svc = fresh_service();
583 let reg = make_request(
584 "RegisterTaskDefinition",
585 json!({
586 "family": "multi",
587 "containerDefinitions": [
588 {"name": "app", "image": "alpine"},
589 {"name": "sidecar", "image": "alpine"}
590 ]
591 }),
592 );
593 svc.register_task_definition(®)
594 .expect("register should succeed");
595
596 let run = make_request(
597 "RunTask",
598 json!({
599 "cluster": "default",
600 "taskDefinition": "multi",
601 }),
602 );
603 let resp = svc.run_task(&run).expect("run_task should succeed");
604 let body: Value =
605 serde_json::from_slice(resp.body.expect_bytes()).expect("body should be valid JSON");
606 let tasks = body
607 .get("tasks")
608 .and_then(|v| v.as_array())
609 .expect("tasks array");
610 assert_eq!(tasks.len(), 1);
611 let task = &tasks[0];
612 let containers = task
613 .get("containers")
614 .and_then(|v| v.as_array())
615 .expect("containers array on task");
616 assert_eq!(containers.len(), 2, "expected both containers in task");
617 let names: Vec<&str> = containers
618 .iter()
619 .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
620 .collect();
621 assert!(names.contains(&"app"));
622 assert!(names.contains(&"sidecar"));
623
624 let arns: std::collections::HashSet<&str> = containers
627 .iter()
628 .filter_map(|c| c.get("containerArn").and_then(|v| v.as_str()))
629 .collect();
630 assert_eq!(arns.len(), 2);
631 }
632
633 #[test]
634 fn register_task_def_defaults_essential_true() {
635 let svc = fresh_service();
636 let reg = make_request(
637 "RegisterTaskDefinition",
638 json!({
639 "family": "default-essential",
640 "containerDefinitions": [
642 {"name": "main", "image": "alpine"},
643 {"name": "extra", "image": "alpine"}
644 ]
645 }),
646 );
647 svc.register_task_definition(®).unwrap();
648
649 let run = make_request(
650 "RunTask",
651 json!({
652 "cluster": "default",
653 "taskDefinition": "default-essential",
654 }),
655 );
656 let resp = svc.run_task(&run).unwrap();
657 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
658 let containers = body["tasks"][0]["containers"].as_array().unwrap();
659 for c in containers {
660 assert_eq!(
661 c.get("essential").and_then(|v| v.as_bool()),
662 Some(true),
663 "container {:?} should default essential=true",
664 c.get("name")
665 );
666 }
667 }
668
669 #[test]
670 fn task_to_json_emits_full_container_array() {
671 let mut task = Task {
674 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
675 task_id: "abc".into(),
676 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
677 cluster_name: "default".into(),
678 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/multi:1"
679 .into(),
680 family: "multi".into(),
681 revision: 1,
682 container_instance_arn: None,
683 capacity_provider_name: None,
684 last_status: "RUNNING".into(),
685 desired_status: "RUNNING".into(),
686 launch_type: "FARGATE".into(),
687 platform_version: None,
688 cpu: None,
689 memory: None,
690 containers: Vec::new(),
691 overrides: json!({}),
692 started_by: None,
693 group: None,
694 connectivity: "CONNECTED".into(),
695 stop_code: None,
696 stopped_reason: None,
697 created_at: chrono::Utc::now(),
698 started_at: None,
699 stopping_at: None,
700 stopped_at: None,
701 pull_started_at: None,
702 pull_stopped_at: None,
703 connectivity_at: None,
704 started_by_ref_id: None,
705 execution_role_arn: None,
706 task_role_arn: None,
707 tags: Vec::new(),
708 awslogs: None,
709 captured_logs: String::new(),
710 protection: None,
711 enable_execute_command: false,
712 attachments: Vec::new(),
713 volume_configurations: Vec::new(),
714 task_set_arn: None,
715 };
716 for name in ["app", "sidecar"] {
717 task.containers.push(Container {
718 container_arn: format!(
719 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
720 ),
721 name: name.into(),
722 image: "alpine".into(),
723 task_arn: task.task_arn.clone(),
724 last_status: "RUNNING".into(),
725 exit_code: None,
726 reason: None,
727 runtime_id: Some(format!("docker-{name}")),
728 essential: true,
729 cpu: None,
730 memory: None,
731 memory_reservation: None,
732 network_bindings: Vec::new(),
733 network_interfaces: Vec::new(),
734 health_status: None,
735 managed_agents: None,
736 image_digest: None,
737 });
738 }
739
740 let v = task_to_json(&task);
741 let containers = v
742 .get("containers")
743 .and_then(|v| v.as_array())
744 .expect("containers array");
745 assert_eq!(containers.len(), 2);
746 let names: Vec<&str> = containers
747 .iter()
748 .filter_map(|c| c.get("name").and_then(|v| v.as_str()))
749 .collect();
750 assert_eq!(names, vec!["app", "sidecar"]);
751 for c in containers {
752 assert!(c.get("containerArn").is_some());
753 assert!(c.get("name").is_some());
754 assert!(c.get("lastStatus").is_some());
755 assert!(c.get("runtimeId").is_some());
756 assert_eq!(c.get("essential").and_then(|v| v.as_bool()), Some(true));
757 }
758 }
759}
760
761#[cfg(test)]
762mod port_mapping_tests {
763 use super::*;
768 use crate::runtime::{
769 build_run_argv, mark_running_multi, ContainerPlan, PortMapping, RunningContainer,
770 };
771 use crate::state::{Container, EcsState};
772 use crate::SharedEcsState;
773 use chrono::Utc;
774 use fakecloud_core::multi_account::MultiAccountState;
775 use parking_lot::RwLock;
776 use std::sync::Arc;
777
778 fn plan_with_ports(
779 port_mappings: Vec<PortMapping>,
780 network_mode: Option<&str>,
781 ) -> ContainerPlan {
782 ContainerPlan {
783 container_name: "app".into(),
784 image: "alpine:latest".into(),
785 env: Vec::new(),
786 entry_point: Vec::new(),
787 command: Vec::new(),
788 secrets_refs: Vec::new(),
789 essential: true,
790 has_task_role: false,
791 port_mappings,
792 network_mode: network_mode.map(String::from),
793 depends_on: Vec::new(),
794 health_check: None,
795 volume_mounts: Vec::new(),
796 ulimits: Vec::new(),
797 linux_parameters: None,
798 stop_timeout: None,
799 user: None,
800 working_directory: None,
801 tty: false,
802 interactive: false,
803 readonly_rootfs: false,
804 }
805 }
806
807 fn argv_string(plan: &ContainerPlan) -> Vec<String> {
808 build_run_argv(plan, &[], "task-1", "host-gateway", "alpine:latest")
809 }
810
811 fn argv_has_publish(argv: &[String], spec: &str) -> bool {
814 argv.windows(2).any(|w| w[0] == "--publish" && w[1] == spec)
815 }
816
817 #[test]
818 fn port_mappings_translate_to_publish_flags() {
819 let plan = plan_with_ports(
820 vec![PortMapping {
821 container_port: 80,
822 host_port: 8080,
823 protocol: "tcp".into(),
824 }],
825 None,
826 );
827 let argv = argv_string(&plan);
828 assert!(
829 argv_has_publish(&argv, "80:8080/tcp"),
830 "expected --publish 80:8080/tcp in argv: {argv:?}"
831 );
832 }
833
834 #[test]
835 fn port_mappings_default_host_port_to_container_port() {
836 let parsed =
842 crate::runtime::__test_parse_port_mapping(&serde_json::json!({"containerPort": 80}))
843 .expect("containerPort should parse");
844 assert_eq!(
845 parsed.host_port, 80,
846 "default hostPort should mirror containerPort"
847 );
848 let argv = argv_string(&plan_with_ports(vec![parsed], None));
849 assert!(
850 argv_has_publish(&argv, "80:80/tcp"),
851 "expected --publish 80:80/tcp when hostPort omitted: {argv:?}"
852 );
853 }
854
855 #[test]
856 fn port_mappings_default_protocol_tcp() {
857 let parsed = crate::runtime::__test_parse_port_mapping(
858 &serde_json::json!({"containerPort": 443, "hostPort": 443}),
859 )
860 .expect("containerPort should parse");
861 assert_eq!(parsed.protocol, "tcp");
862 let argv = argv_string(&plan_with_ports(vec![parsed], None));
863 assert!(
864 argv_has_publish(&argv, "443:443/tcp"),
865 "expected default protocol tcp: {argv:?}"
866 );
867 }
868
869 #[test]
870 fn awsvpc_network_mode_skips_publish() {
871 let plan = plan_with_ports(
872 vec![PortMapping {
873 container_port: 80,
874 host_port: 8080,
875 protocol: "tcp".into(),
876 }],
877 Some("awsvpc"),
878 );
879 let argv = argv_string(&plan);
880 assert!(
881 !argv.iter().any(|s| s == "--publish"),
882 "awsvpc must not emit --publish: {argv:?}"
883 );
884 }
885
886 #[test]
887 fn awsvpc_network_mode_includes_network_flag() {
888 let plan = plan_with_ports(Vec::new(), Some("awsvpc"));
889 let argv = argv_string(&plan);
890 let network_idx = argv.iter().position(|s| s == "--network");
891 assert!(
892 network_idx.is_some(),
893 "awsvpc must emit --network: {argv:?}"
894 );
895 let network_name = argv.get(network_idx.unwrap() + 1);
896 assert!(
897 network_name
898 .map(|n| n.starts_with("fakecloud-ecs-"))
899 .unwrap_or(false),
900 "awsvpc must reference fakecloud-ecs network: {argv:?}"
901 );
902 }
903
904 #[test]
905 fn network_bindings_populated_on_task() {
906 let mut accounts: MultiAccountState<EcsState> =
910 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
911 let acct = accounts.get_or_create("000000000000");
912 let arn = acct.cluster_arn("default");
913 acct.clusters
914 .insert("default".into(), Cluster::new("default", arn));
915 let mut task = Task {
916 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
917 task_id: "abc".into(),
918 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
919 cluster_name: "default".into(),
920 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/web:1".into(),
921 family: "web".into(),
922 revision: 1,
923 container_instance_arn: None,
924 capacity_provider_name: None,
925 last_status: "PENDING".into(),
926 desired_status: "RUNNING".into(),
927 launch_type: "FARGATE".into(),
928 platform_version: None,
929 cpu: None,
930 memory: None,
931 containers: vec![Container {
932 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/web"
933 .into(),
934 name: "web".into(),
935 image: "alpine".into(),
936 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
937 last_status: "PENDING".into(),
938 exit_code: None,
939 reason: None,
940 runtime_id: None,
941 essential: true,
942 cpu: None,
943 memory: None,
944 memory_reservation: None,
945 network_bindings: Vec::new(),
946 network_interfaces: Vec::new(),
947 health_status: None,
948 managed_agents: None,
949 image_digest: None,
950 }],
951 overrides: serde_json::json!({}),
952 started_by: None,
953 group: None,
954 connectivity: "CONNECTING".into(),
955 stop_code: None,
956 stopped_reason: None,
957 created_at: Utc::now(),
958 started_at: None,
959 stopping_at: None,
960 stopped_at: None,
961 pull_started_at: None,
962 pull_stopped_at: None,
963 connectivity_at: None,
964 started_by_ref_id: None,
965 execution_role_arn: None,
966 task_role_arn: None,
967 tags: Vec::new(),
968 awslogs: None,
969 captured_logs: String::new(),
970 protection: None,
971 enable_execute_command: false,
972 attachments: Vec::new(),
973 volume_configurations: Vec::new(),
974 task_set_arn: None,
975 };
976 task.last_status = "PENDING".into();
977 acct.tasks.insert("abc".into(), task);
978 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
979
980 let bindings = vec![serde_json::json!({
981 "bindIP": "0.0.0.0",
982 "containerPort": 80,
983 "hostPort": 8080,
984 "protocol": "tcp",
985 })];
986 let started = vec![RunningContainer {
987 name: "web".into(),
988 container_id: "docker-id".into(),
989 essential: true,
990 exit_code: None,
991 network_bindings: bindings.clone(),
992 image_digest: None,
993 }];
994 mark_running_multi(&state, "000000000000", "abc", &started);
995
996 let accounts = state.read();
997 let task = accounts
998 .get("000000000000")
999 .unwrap()
1000 .tasks
1001 .get("abc")
1002 .unwrap();
1003 let json = task_to_json(task);
1004 let nb = &json["containers"][0]["networkBindings"];
1005 assert_eq!(nb, &serde_json::Value::Array(bindings));
1006 }
1007}