1use std::path::PathBuf;
11use std::sync::Arc;
12use std::time::Duration;
13
14use base64::Engine;
15use chrono::Utc;
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_logs::ingest::{append_events, IngestEvent};
18use fakecloud_logs::SharedLogsState;
19use fakecloud_secretsmanager::SharedSecretsManagerState;
20use fakecloud_ssm::SharedSsmState;
21use parking_lot::RwLock;
22use tempfile::TempDir;
23use tokio::process::Command;
24
25use crate::state::{LifecycleEvent, SharedEcsState};
26
27#[derive(Debug, thiserror::Error)]
28pub enum RuntimeError {
29 #[error("container CLI not found (tried docker, podman)")]
30 NoCli,
31 #[error("image pull failed: {0}")]
32 ImagePull(String),
33 #[error("container start failed: {0}")]
34 ContainerStart(String),
35 #[error("docker wait failed: {0}")]
36 Wait(String),
37}
38
39pub struct EcsRuntime {
41 cli: String,
42 host_ip: String,
43 server_port: u16,
48 docker_config: Option<Arc<TempDir>>,
53 containers: RwLock<std::collections::HashMap<String, Vec<(String, String)>>>,
58 delivery_bus: Option<Arc<DeliveryBus>>,
62 logs_state: Option<SharedLogsState>,
66 secretsmanager_state: Option<SharedSecretsManagerState>,
69 ssm_state: Option<SharedSsmState>,
72}
73
74impl EcsRuntime {
75 pub fn new(server_port: u16) -> Option<Self> {
80 let cli = if let Ok(cli) = std::env::var("FAKECLOUD_CONTAINER_CLI") {
81 if cli_works(&cli) {
82 cli
83 } else {
84 return None;
85 }
86 } else if cli_works("docker") {
87 "docker".to_string()
88 } else if cli_works("podman") {
89 "podman".to_string()
90 } else {
91 return None;
92 };
93 let host_ip = if cfg!(target_os = "linux") {
94 "172.17.0.1".to_string()
95 } else {
96 "host-gateway".to_string()
97 };
98 let docker_config = build_local_registry_docker_config(server_port).map(Arc::new);
99 Some(Self {
100 cli,
101 host_ip,
102 server_port,
103 docker_config,
104 containers: RwLock::new(std::collections::HashMap::new()),
105 delivery_bus: None,
106 logs_state: None,
107 secretsmanager_state: None,
108 ssm_state: None,
109 })
110 }
111
112 fn docker_config_path(&self) -> Option<PathBuf> {
116 self.docker_config.as_ref().map(|d| d.path().to_path_buf())
117 }
118
119 fn cli_command(&self) -> Command {
122 let mut cmd = Command::new(&self.cli);
123 if let Some(p) = self.docker_config_path() {
124 cmd.env("DOCKER_CONFIG", p);
125 }
126 cmd
127 }
128
129 pub fn cli_name(&self) -> &str {
130 &self.cli
131 }
132
133 pub fn with_delivery_bus(mut self, bus: Arc<DeliveryBus>) -> Self {
136 self.delivery_bus = Some(bus);
137 self
138 }
139
140 fn register_lb_targets(&self, state: &SharedEcsState, account_id: &str, task_id: &str) {
141 let Some(ref bus) = self.delivery_bus else {
142 return;
143 };
144 let accounts = state.read();
145 let Some(s) = accounts.get(account_id) else {
146 return;
147 };
148 let Some(task) = s.tasks.get(task_id) else {
149 return;
150 };
151 let targets = compute_elbv2_targets(s, task);
152 drop(accounts);
153 for (tg_arn, tg_targets) in targets {
154 bus.register_elbv2_targets(account_id, &tg_arn, tg_targets);
155 }
156 }
157
158 fn deregister_lb_targets(&self, state: &SharedEcsState, account_id: &str, task_id: &str) {
159 let Some(ref bus) = self.delivery_bus else {
160 return;
161 };
162 let accounts = state.read();
163 let Some(s) = accounts.get(account_id) else {
164 return;
165 };
166 let Some(task) = s.tasks.get(task_id) else {
167 return;
168 };
169 let targets = compute_elbv2_targets(s, task);
170 drop(accounts);
171 for (tg_arn, tg_targets) in targets {
172 bus.deregister_elbv2_targets(account_id, &tg_arn, tg_targets);
173 }
174 }
175
176 pub fn with_logs(mut self, logs: SharedLogsState) -> Self {
179 self.logs_state = Some(logs);
180 self
181 }
182
183 pub fn with_secretsmanager(mut self, state: SharedSecretsManagerState) -> Self {
186 self.secretsmanager_state = Some(state);
187 self
188 }
189
190 pub fn with_ssm(mut self, state: SharedSsmState) -> Self {
193 self.ssm_state = Some(state);
194 self
195 }
196
197 pub fn run_task(self: Arc<Self>, state: SharedEcsState, task_id: String, account_id: String) {
202 let rt = self.clone();
203 tokio::spawn(async move {
204 if let Err(err) = rt.run_task_inner(&state, &task_id, &account_id).await {
205 tracing::warn!(%err, task = %task_id, "ecs task execution failed");
206 eprintln!("[ecs] task {task_id} failed: {err}");
209 finalize_failure(&state, &account_id, &task_id, &err.to_string());
210 rt.emit_state_change(
211 &state,
212 &account_id,
213 &task_id,
214 "STOPPED",
215 Some(("TaskFailedToStart", err.to_string())),
216 );
217 }
218 });
219 }
220
221 async fn run_task_inner(
222 &self,
223 state: &SharedEcsState,
224 task_id: &str,
225 account_id: &str,
226 ) -> Result<(), RuntimeError> {
227 let plans = build_container_plans(state, account_id, task_id, self.server_port)?;
231 if plans.is_empty() {
232 return Err(RuntimeError::ContainerStart(
233 "task has no containers".into(),
234 ));
235 }
236
237 let mut resolved_plans: Vec<ResolvedContainerPlan> = Vec::with_capacity(plans.len());
241 for plan in plans {
242 let mut env = plan.env.clone();
243 for (name, value_from) in &plan.secrets_refs {
244 match self.resolve_secret(account_id, value_from) {
245 Some(v) => env.push((name.clone(), v)),
246 None => {
247 return Err(RuntimeError::ContainerStart(format!(
248 "failed to resolve secret {name} from {value_from}"
249 )));
250 }
251 }
252 }
253 if plan.has_task_role {
254 env.push((
255 "AWS_CONTAINER_CREDENTIALS_FULL_URI".into(),
256 format!(
257 "http://host.docker.internal:{}/_fakecloud/ecs/creds/{}",
258 self.server_port, task_id
259 ),
260 ));
261 }
262 env.push((
263 "ECS_CONTAINER_METADATA_URI".into(),
264 format!(
265 "http://host.docker.internal:{}/_fakecloud/ecs/v3/{}",
266 self.server_port, task_id
267 ),
268 ));
269 env.push((
270 "ECS_CONTAINER_METADATA_URI_V4".into(),
271 format!(
272 "http://host.docker.internal:{}/_fakecloud/ecs/v4/{}",
273 self.server_port, task_id
274 ),
275 ));
276 resolved_plans.push(ResolvedContainerPlan { plan, env });
277 }
278
279 mark_pull_started(state, account_id, task_id);
282 let mut run_images: Vec<String> = Vec::with_capacity(resolved_plans.len());
283 let mut image_digests: Vec<Option<String>> = Vec::with_capacity(resolved_plans.len());
284 for rp in &resolved_plans {
285 let local_pull_uri =
286 fakecloud_core::ecr_uri::translate_to_local(&rp.plan.image, self.server_port);
287 let pull_uri = local_pull_uri.as_deref().unwrap_or(&rp.plan.image);
288 let pull_out = self
289 .cli_command()
290 .args(["pull", pull_uri])
291 .output()
292 .await
293 .map_err(|e| RuntimeError::ImagePull(e.to_string()))?;
294 if !pull_out.status.success() {
295 let err = String::from_utf8_lossy(&pull_out.stderr).to_string();
296 return Err(RuntimeError::ImagePull(err));
297 }
298 let run_image = if let Some(ref local_uri) = local_pull_uri {
303 if fakecloud_core::ecr_uri::is_digest_ref(&rp.plan.image) {
304 local_uri.clone()
305 } else {
306 let _ = self
307 .cli_command()
308 .args(["tag", local_uri, &rp.plan.image])
309 .output()
310 .await;
311 rp.plan.image.clone()
312 }
313 } else {
314 rp.plan.image.clone()
315 };
316 let digest = self.lookup_image_digest(pull_uri).await;
321 run_images.push(run_image);
322 image_digests.push(digest);
323 }
324 mark_pull_stopped(state, account_id, task_id);
325
326 let awsvpc_network = resolved_plans
331 .iter()
332 .any(|rp| rp.plan.network_mode.as_deref() == Some("awsvpc"));
333 let network_name = format!("fakecloud-ecs-{}", task_id);
334 let network_created = if awsvpc_network {
335 let create = Command::new(&self.cli)
336 .args([
337 "network",
338 "create",
339 "--driver",
340 "bridge",
341 "--label",
342 &format!("fakecloud-ecs-task={}", task_id),
343 &network_name,
344 ])
345 .output()
346 .await;
347 match create {
348 Ok(out) if out.status.success() => {
349 tracing::info!(
350 task = %task_id,
351 network = %network_name,
352 "created awsvpc docker network"
353 );
354 true
355 }
356 Ok(out) => {
357 let err = String::from_utf8_lossy(&out.stderr);
358 tracing::warn!(
359 task = %task_id,
360 network = %network_name,
361 error = %err,
362 "awsvpc network creation failed; falling back to default bridge"
363 );
364 false
365 }
366 Err(e) => {
367 tracing::warn!(
368 task = %task_id,
369 network = %network_name,
370 error = %e,
371 "awsvpc network creation failed; falling back to default bridge"
372 );
373 false
374 }
375 }
376 } else {
377 false
378 };
379
380 if network_created {
381 let eni_id = format!(
382 "eni-{}",
383 uuid::Uuid::new_v4()
384 .to_string()
385 .replace('-', "")
386 .get(..17)
387 .unwrap_or("")
388 );
389 let mac = format!(
390 "02:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}",
391 rand::random::<u8>(),
392 rand::random::<u8>(),
393 rand::random::<u8>(),
394 rand::random::<u8>(),
395 rand::random::<u8>()
396 );
397 let ip = format!("10.0.{}.{}", rand::random::<u8>(), rand::random::<u8>());
398 let mut accounts = state.write();
399 if let Some(st) = accounts.get_mut(account_id) {
400 if let Some(task) = st.tasks.get_mut(task_id) {
401 task.attachments.push(crate::state::TaskAttachment {
402 id: eni_id.clone(),
403 attachment_type: "eni".into(),
404 status: "ATTACHED".into(),
405 details: vec![
406 crate::state::AttachmentDetail {
407 name: "subnetId".into(),
408 value: "subnet-fakecloud".into(),
409 },
410 crate::state::AttachmentDetail {
411 name: "privateIPv4Address".into(),
412 value: ip.clone(),
413 },
414 crate::state::AttachmentDetail {
415 name: "macAddress".into(),
416 value: mac.clone(),
417 },
418 ],
419 });
420 }
421 }
422 tracing::info!(
423 task = %task_id,
424 eni = %eni_id,
425 ip = %ip,
426 "populated awsvpc ENI attachment"
427 );
428 }
429
430 let mut started: Vec<RunningContainer> = Vec::with_capacity(resolved_plans.len());
438 for (idx, (rp, run_image)) in resolved_plans.iter().zip(run_images.iter()).enumerate() {
439 for dep in &rp.plan.depends_on {
444 let upstream = match started.iter().find(|c| c.name == dep.container_name) {
445 Some(u) => u,
446 None => continue,
450 };
451 let upstream_has_health_check = resolved_plans
455 .iter()
456 .find(|p| p.plan.container_name == dep.container_name)
457 .is_some_and(|p| p.plan.health_check.is_some());
458 if let Err(err) = self
459 .wait_for_depends_on(upstream, dep.condition, upstream_has_health_check)
460 .await
461 {
462 self.cleanup_partial_start(&started);
463 return Err(err);
464 }
465 }
466 let argv = build_run_argv(&rp.plan, &rp.env, task_id, &self.host_ip, run_image);
467 let mut cmd = Command::new(&self.cli);
468 cmd.args(&argv);
469 let run_out = cmd.output().await.map_err(|e| {
470 self.cleanup_partial_start(&started);
472 RuntimeError::ContainerStart(e.to_string())
473 })?;
474 if !run_out.status.success() {
475 let err = String::from_utf8_lossy(&run_out.stderr).to_string();
476 self.cleanup_partial_start(&started);
477 return Err(RuntimeError::ContainerStart(err));
478 }
479 let container_id = String::from_utf8_lossy(&run_out.stdout).trim().to_string();
480 started.push(RunningContainer {
481 name: rp.plan.container_name.clone(),
482 container_id,
483 essential: rp.plan.essential,
484 exit_code: None,
485 network_bindings: network_bindings_for(&rp.plan),
486 image_digest: image_digests.get(idx).cloned().unwrap_or(None),
487 });
488 }
489
490 {
493 let mut guard = self.containers.write();
494 guard.insert(
495 task_id.to_string(),
496 started
497 .iter()
498 .map(|c| (c.name.clone(), c.container_id.clone()))
499 .collect(),
500 );
501 }
502 mark_running_multi(state, account_id, task_id, &started);
503 self.register_lb_targets(state, account_id, task_id);
504 self.emit_state_change(state, account_id, task_id, "RUNNING", None);
505
506 let wait_outcome = self
514 .wait_for_task_exit_with_health(state, account_id, task_id, &started)
515 .await?;
516
517 let mut final_containers = started.clone();
520 for (i, rc) in started.iter().enumerate() {
521 if Some(i) == wait_outcome.exited_index {
522 final_containers[i].exit_code = Some(wait_outcome.exit_code);
523 continue;
524 }
525 let inspect = Command::new(&self.cli)
529 .args(["inspect", "-f", "{{.State.ExitCode}}", &rc.container_id])
530 .output()
531 .await;
532 let still_running = match inspect {
533 Ok(out) if out.status.success() => {
534 let s = String::from_utf8_lossy(&out.stdout).trim().to_string();
535 let running = Command::new(&self.cli)
538 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
539 .output()
540 .await
541 .map(|o| String::from_utf8_lossy(&o.stdout).trim() == "true")
542 .unwrap_or(false);
543 if !running {
544 if let Ok(code) = s.parse::<i64>() {
545 final_containers[i].exit_code = Some(code);
546 }
547 }
548 running
549 }
550 _ => false,
551 };
552 if still_running {
553 let _ = Command::new(&self.cli)
554 .args(["stop", "--time", "10", &rc.container_id])
555 .output()
556 .await;
557 let wait_out = Command::new(&self.cli)
558 .args(["wait", &rc.container_id])
559 .output()
560 .await;
561 if let Ok(out) = wait_out {
562 let code: i64 = String::from_utf8_lossy(&out.stdout)
563 .trim()
564 .parse()
565 .unwrap_or(-1);
566 final_containers[i].exit_code = Some(code);
567 }
568 }
569 }
570
571 let mut captured = String::new();
574 for rc in &started {
575 let logs_out = Command::new(&self.cli)
576 .args(["logs", &rc.container_id])
577 .output()
578 .await
579 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
580 captured.push_str(&format!("[{}] ", rc.name));
581 captured.push_str(&String::from_utf8_lossy(&logs_out.stdout));
582 captured.push_str(&String::from_utf8_lossy(&logs_out.stderr));
583 }
584
585 for rc in &started {
587 let _ = Command::new(&self.cli)
588 .args(["rm", "-f", &rc.container_id])
589 .output()
590 .await;
591 }
592 if network_created {
594 let _ = Command::new(&self.cli)
595 .args(["network", "rm", &network_name])
596 .output()
597 .await;
598 }
599 self.containers.write().remove(task_id);
600
601 self.forward_awslogs_if_configured(state, account_id, task_id, &captured);
606 let exit_code = wait_outcome.exit_code;
607 finalize_stopped_multi(
608 state,
609 account_id,
610 task_id,
611 &final_containers,
612 exit_code,
613 &captured,
614 wait_outcome.stop_code,
615 None,
616 );
617 self.deregister_lb_targets(state, account_id, task_id);
618 self.emit_state_change(
619 state,
620 account_id,
621 task_id,
622 "STOPPED",
623 Some((wait_outcome.stop_code, format!("Exit code {}", exit_code))),
624 );
625 Ok(())
626 }
627
628 async fn wait_for_task_exit_with_health(
637 &self,
638 state: &SharedEcsState,
639 account_id: &str,
640 task_id: &str,
641 started: &[RunningContainer],
642 ) -> Result<TaskExitOutcome, RuntimeError> {
643 let any_essential = started.iter().any(|c| c.essential);
644 let mut working: Vec<RunningContainer> = started.to_vec();
645 let mut first_exited: Option<usize> = None;
646 loop {
647 self.refresh_health_status(state, account_id, task_id, started)
652 .await;
653 for (i, rc) in started.iter().enumerate() {
654 if working[i].exit_code.is_some() {
655 continue;
656 }
657 let inspect = Command::new(&self.cli)
658 .args(["inspect", "-f", "{{.State.Running}}", &rc.container_id])
659 .output()
660 .await;
661 let running = match inspect {
662 Ok(out) if out.status.success() => {
663 String::from_utf8_lossy(&out.stdout).trim() == "true"
664 }
665 _ => false,
666 };
667 if running {
668 continue;
669 }
670 let wait_out = Command::new(&self.cli)
671 .args(["wait", &rc.container_id])
672 .output()
673 .await
674 .map_err(|e| RuntimeError::Wait(e.to_string()))?;
675 if !wait_out.status.success() {
676 let err = String::from_utf8_lossy(&wait_out.stderr).to_string();
677 return Err(RuntimeError::Wait(err));
678 }
679 let exit_code: i64 = String::from_utf8_lossy(&wait_out.stdout)
680 .trim()
681 .parse()
682 .unwrap_or(-1);
683 working[i].exit_code = Some(exit_code);
684 if first_exited.is_none() && (rc.essential || !any_essential) {
685 first_exited = Some(i);
686 }
687 }
688 if task_should_stop(&working) {
689 let idx = first_exited
690 .or_else(|| working.iter().position(|c| c.exit_code.is_some()))
691 .unwrap_or(0);
692 let exit_code = working[idx].exit_code.unwrap_or(-1);
693 return Ok(TaskExitOutcome {
694 exited_index: Some(idx),
695 exit_code,
696 stop_code: if any_essential {
697 "EssentialContainerExited"
698 } else {
699 "TaskCompleted"
700 },
701 });
702 }
703 sleep(Duration::from_millis(200)).await;
704 }
705 }
706
707 async fn refresh_health_status(
712 &self,
713 state: &SharedEcsState,
714 account_id: &str,
715 task_id: &str,
716 started: &[RunningContainer],
717 ) {
718 let mut updates: Vec<(String, String)> = Vec::with_capacity(started.len());
719 for rc in started {
720 let out = Command::new(&self.cli)
721 .args([
722 "inspect",
723 "-f",
724 "{{if .State.Health}}{{.State.Health.Status}}{{else}}{{end}}",
725 &rc.container_id,
726 ])
727 .output()
728 .await;
729 let status = match out {
730 Ok(o) if o.status.success() => {
731 let raw = String::from_utf8_lossy(&o.stdout).trim().to_string();
732 if raw.is_empty() {
733 "UNKNOWN".to_string()
736 } else {
737 docker_health_to_ecs(&raw).to_string()
738 }
739 }
740 _ => continue,
741 };
742 updates.push((rc.name.clone(), status));
743 }
744 if updates.is_empty() {
745 return;
746 }
747 let mut accounts = state.write();
748 let Some(s) = accounts.get_mut(account_id) else {
749 return;
750 };
751 let Some(task) = s.tasks.get_mut(task_id) else {
752 return;
753 };
754 for (name, status) in updates {
755 if let Some(c) = task.containers.iter_mut().find(|c| c.name == name) {
756 c.health_status = Some(status);
757 }
758 }
759 }
760
761 async fn lookup_image_digest(&self, pull_uri: &str) -> Option<String> {
766 let out = self
767 .cli_command()
768 .args([
769 "image",
770 "inspect",
771 "-f",
772 "{{index .RepoDigests 0}}",
773 pull_uri,
774 ])
775 .output()
776 .await
777 .ok()?;
778 if !out.status.success() {
779 return None;
780 }
781 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
782 if raw.is_empty() || raw == "<no value>" {
783 return None;
784 }
785 Some(
788 raw.rsplit_once('@')
789 .map(|(_, d)| d.to_string())
790 .unwrap_or(raw),
791 )
792 }
793
794 async fn wait_for_depends_on(
807 &self,
808 upstream: &RunningContainer,
809 condition: DependsOnCondition,
810 upstream_has_health_check: bool,
811 ) -> Result<(), RuntimeError> {
812 const WAIT_TIMEOUT: Duration = Duration::from_secs(120);
815 const POLL_INTERVAL: Duration = Duration::from_millis(200);
816
817 if matches!(condition, DependsOnCondition::Healthy) && !upstream_has_health_check {
822 return Ok(());
823 }
824
825 let deadline = std::time::Instant::now() + WAIT_TIMEOUT;
826 loop {
827 let inspect = inspect_container_state(&self.cli, &upstream.container_id).await;
828 if let Some(state) = inspect {
829 if condition_is_met(condition, &state) {
830 return Ok(());
831 }
832 if matches!(condition, DependsOnCondition::Success)
837 && state.exited
838 && state.exit_code != 0
839 {
840 return Err(RuntimeError::ContainerStart(format!(
841 "dependency on container {} ({}) failed: upstream exited with code {}",
842 upstream.name,
843 DependsOnCondition::Success.as_aws_str(),
844 state.exit_code,
845 )));
846 }
847 }
848 if std::time::Instant::now() >= deadline {
849 return Err(RuntimeError::ContainerStart(format!(
850 "timed out waiting for container {} to reach condition {}",
851 upstream.name,
852 condition.as_aws_str(),
853 )));
854 }
855 tokio::time::sleep(POLL_INTERVAL).await;
856 }
857 }
858
859 fn cleanup_partial_start(&self, started: &[RunningContainer]) {
863 let cli = self.cli.clone();
864 let ids: Vec<String> = started.iter().map(|c| c.container_id.clone()).collect();
865 tokio::spawn(async move {
866 for id in ids {
867 let _ = Command::new(&cli).args(["rm", "-f", &id]).output().await;
868 }
869 });
870 }
871
872 fn resolve_secret(&self, account_id: &str, value_from: &str) -> Option<String> {
877 if value_from.contains(":secret:") {
878 let state = self.secretsmanager_state.as_ref()?;
879 let accounts = state.read();
880 let sm = accounts.get(account_id)?;
881 let arn_tail = value_from.rsplit(":secret:").next()?;
885 let name = arn_tail
886 .rsplit_once('-')
887 .map(|(n, _)| n)
888 .unwrap_or(arn_tail);
889 let secret = sm.secrets.get(name).or_else(|| sm.secrets.get(arn_tail))?;
890 let version_id = secret.current_version_id.as_ref()?;
891 let v = secret.versions.get(version_id)?;
892 return v.secret_string.clone();
893 }
894 if value_from.contains(":parameter") {
895 let state = self.ssm_state.as_ref()?;
896 let accounts = state.read();
897 let ssm = accounts.get(account_id)?;
898 let after = value_from.rsplit(":parameter").next()?;
902 let name_with_slash = after.trim_start_matches('/');
903 return ssm
904 .parameters
905 .get(&format!("/{name_with_slash}"))
906 .or_else(|| ssm.parameters.get(name_with_slash))
907 .map(|p| p.value.clone());
908 }
909 None
910 }
911
912 fn emit_state_change(
916 &self,
917 state: &SharedEcsState,
918 account_id: &str,
919 task_id: &str,
920 last_status: &str,
921 stop: Option<(&str, String)>,
922 ) {
923 let Some(ref bus) = self.delivery_bus else {
924 return;
925 };
926 let Some(task_view) = snapshot_task(state, account_id, task_id) else {
927 return;
928 };
929 let mut detail = serde_json::json!({
930 "taskArn": task_view.task_arn,
931 "clusterArn": task_view.cluster_arn,
932 "lastStatus": last_status,
933 "desiredStatus": if last_status == "STOPPED" { "STOPPED" } else { "RUNNING" },
934 "launchType": task_view.launch_type,
935 "group": task_view.group,
936 "taskDefinitionArn": task_view.task_definition_arn,
937 "containers": task_view.containers,
938 });
939 if let Some((code, reason)) = stop {
940 detail["stopCode"] = code.into();
941 detail["stoppedReason"] = reason.into();
942 }
943 bus.put_event_to_eventbridge(
944 "aws.ecs",
945 "ECS Task State Change",
946 &detail.to_string(),
947 "default",
948 );
949 }
950
951 fn forward_awslogs_if_configured(
955 &self,
956 state: &SharedEcsState,
957 account_id: &str,
958 task_id: &str,
959 captured: &str,
960 ) {
961 let Some(ref logs) = self.logs_state else {
962 return;
963 };
964 let (cfg, task_region) = {
967 let accounts = state.read();
968 let Some(s) = accounts.get(account_id) else {
969 return;
970 };
971 let Some(task) = s.tasks.get(task_id) else {
972 return;
973 };
974 let Some(ref cfg) = task.awslogs else {
975 return;
976 };
977 (cfg.clone(), s.region.clone())
978 };
979 if captured.is_empty() {
980 return;
981 }
982 let now = Utc::now().timestamp_millis();
983 let stream_name = cfg.stream_name(task_id);
984 let events: Vec<IngestEvent> = captured
985 .lines()
986 .enumerate()
987 .map(|(i, line)| IngestEvent {
988 timestamp_ms: now.saturating_add(i as i64),
992 message: line.to_string(),
993 })
994 .collect();
995 append_events(
996 logs,
997 account_id,
998 &task_region,
999 &cfg.group,
1000 &stream_name,
1001 &events,
1002 );
1003 }
1004
1005 pub async fn stop_task(&self, task_id: &str, reason: &str) -> bool {
1010 let containers = self.containers.read().get(task_id).cloned();
1011 let Some(list) = containers else {
1012 return false;
1013 };
1014 if list.is_empty() {
1015 return false;
1016 }
1017 for (_name, id) in &list {
1019 let _ = Command::new(&self.cli)
1020 .args(["stop", "--time", "10", id])
1021 .output()
1022 .await;
1023 }
1024 tracing::info!(task = %task_id, reason = %reason, "ecs task stop requested");
1025 true
1026 }
1027
1028 pub async fn stop_all(&self) {
1032 let ids: Vec<String> = self
1033 .containers
1034 .read()
1035 .values()
1036 .flat_map(|list| list.iter().map(|(_, id)| id.clone()))
1037 .collect();
1038 for id in ids {
1039 let _ = Command::new(&self.cli).args(["kill", &id]).output().await;
1040 let _ = Command::new(&self.cli).args(["rm", &id]).output().await;
1041 }
1042 self.containers.write().clear();
1043 }
1044}
1045
1046#[derive(Clone, Debug)]
1048pub(crate) struct ContainerPlan {
1049 pub(crate) container_name: String,
1050 pub(crate) image: String,
1051 pub(crate) env: Vec<(String, String)>,
1052 pub(crate) entry_point: Vec<String>,
1053 pub(crate) command: Vec<String>,
1054 pub(crate) secrets_refs: Vec<(String, String)>,
1055 pub(crate) essential: bool,
1056 pub(crate) has_task_role: bool,
1057 pub(crate) port_mappings: Vec<PortMapping>,
1062 pub(crate) network_mode: Option<String>,
1067 pub(crate) depends_on: Vec<DependsOn>,
1075 pub(crate) health_check: Option<HealthCheckSpec>,
1082 pub(crate) volume_mounts: Vec<VolumeMount>,
1087 pub(crate) ulimits: Vec<Ulimit>,
1090 pub(crate) linux_parameters: Option<LinuxParameters>,
1094 pub(crate) stop_timeout: Option<u32>,
1096 pub(crate) user: Option<String>,
1098 pub(crate) working_directory: Option<String>,
1100 pub(crate) tty: bool,
1102 pub(crate) interactive: bool,
1104 pub(crate) readonly_rootfs: bool,
1106}
1107
1108#[derive(Clone, Debug, PartialEq, Eq)]
1114pub(crate) struct DependsOn {
1115 pub container_name: String,
1116 pub condition: DependsOnCondition,
1117}
1118
1119#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1123pub(crate) enum DependsOnCondition {
1124 Start,
1127 Complete,
1129 Success,
1131 Healthy,
1135}
1136
1137impl DependsOnCondition {
1138 pub fn parse(raw: &str) -> Option<Self> {
1142 match raw {
1143 "START" => Some(Self::Start),
1144 "COMPLETE" => Some(Self::Complete),
1145 "SUCCESS" => Some(Self::Success),
1146 "HEALTHY" => Some(Self::Healthy),
1147 _ => None,
1148 }
1149 }
1150
1151 pub fn as_aws_str(self) -> &'static str {
1155 match self {
1156 Self::Start => "START",
1157 Self::Complete => "COMPLETE",
1158 Self::Success => "SUCCESS",
1159 Self::Healthy => "HEALTHY",
1160 }
1161 }
1162}
1163
1164#[derive(Clone, Debug, PartialEq, Eq)]
1170pub(crate) struct HealthCheckSpec {
1171 pub command: Vec<String>,
1177 pub interval_seconds: u32,
1178 pub timeout_seconds: u32,
1179 pub retries: u32,
1180 pub start_period_seconds: u32,
1181}
1182
1183#[derive(Clone, Debug, PartialEq, Eq)]
1187pub(crate) struct PortMapping {
1188 pub container_port: u16,
1189 pub host_port: u16,
1192 pub protocol: String,
1194}
1195
1196#[derive(Clone, Debug, PartialEq, Eq)]
1218pub(crate) struct VolumeMount {
1219 pub source: String,
1223 pub container_path: String,
1226 pub read_only: bool,
1230}
1231
1232#[derive(Clone, Debug, PartialEq, Eq)]
1234pub(crate) struct Ulimit {
1235 pub name: String,
1236 pub soft_limit: i32,
1237 pub hard_limit: i32,
1238}
1239
1240#[derive(Clone, Debug, PartialEq, Eq)]
1242pub(crate) struct Device {
1243 pub host_path: String,
1244 pub container_path: String,
1245 pub permissions: String,
1246}
1247
1248#[derive(Clone, Debug, PartialEq, Eq)]
1250pub(crate) struct Sysctl {
1251 pub name: String,
1252 pub value: String,
1253}
1254
1255#[derive(Clone, Debug, PartialEq, Eq, Default)]
1257pub(crate) struct LinuxParameters {
1258 pub capabilities_add: Vec<String>,
1259 pub capabilities_drop: Vec<String>,
1260 pub devices: Vec<Device>,
1261 pub init_process_enabled: bool,
1262 pub shared_memory_size: Option<i32>,
1263 pub sysctls: Vec<Sysctl>,
1264 pub tmpfs: Vec<Tmpfs>,
1265 pub privileged: bool,
1266}
1267
1268#[derive(Clone, Debug, PartialEq, Eq)]
1270pub(crate) struct Tmpfs {
1271 pub container_path: String,
1272 pub size: i32,
1273 pub mount_options: Vec<String>,
1274}
1275
1276#[derive(Clone, Debug)]
1277struct ResolvedContainerPlan {
1278 plan: ContainerPlan,
1279 env: Vec<(String, String)>,
1280}
1281
1282#[derive(Clone, Debug)]
1284struct TaskExitOutcome {
1285 exited_index: Option<usize>,
1289 exit_code: i64,
1290 stop_code: &'static str,
1291}
1292
1293#[derive(Clone, Debug)]
1296pub(crate) struct RunningContainer {
1297 pub(crate) name: String,
1298 pub(crate) container_id: String,
1299 pub(crate) essential: bool,
1300 pub(crate) exit_code: Option<i64>,
1301 pub(crate) network_bindings: Vec<serde_json::Value>,
1305 pub(crate) image_digest: Option<String>,
1310}
1311
1312pub(crate) fn task_should_stop(containers: &[RunningContainer]) -> bool {
1317 if containers.is_empty() {
1318 return true;
1319 }
1320 let any_essential_exited = containers
1321 .iter()
1322 .any(|c| c.essential && c.exit_code.is_some());
1323 if any_essential_exited {
1324 return true;
1325 }
1326 containers.iter().all(|c| c.exit_code.is_some())
1327}
1328
1329fn build_container_plans(
1330 state: &SharedEcsState,
1331 account_id: &str,
1332 task_id: &str,
1333 _server_port: u16,
1334) -> Result<Vec<ContainerPlan>, RuntimeError> {
1335 let accounts = state.read();
1336 let s = accounts
1337 .get(account_id)
1338 .ok_or_else(|| RuntimeError::ContainerStart("account missing".into()))?;
1339 let task = s
1340 .tasks
1341 .get(task_id)
1342 .ok_or_else(|| RuntimeError::ContainerStart("task missing".into()))?;
1343 if task.containers.is_empty() {
1344 return Err(RuntimeError::ContainerStart(
1345 "task has no containers".into(),
1346 ));
1347 }
1348 let has_task_role = task.task_role_arn.is_some();
1349 let task_def = s
1350 .task_definitions
1351 .get(&task.family)
1352 .and_then(|revs| revs.get(&task.revision));
1353 let network_mode = task_def.and_then(|td| td.network_mode.clone());
1354 let volumes_by_name: std::collections::HashMap<String, &serde_json::Value> = task_def
1359 .map(|td| {
1360 td.volumes
1361 .iter()
1362 .filter_map(|v| {
1363 let name = v.get("name").and_then(|n| n.as_str())?;
1364 Some((name.to_string(), v))
1365 })
1366 .collect()
1367 })
1368 .unwrap_or_default();
1369 let mut plans = Vec::with_capacity(task.containers.len());
1370 for container in &task.containers {
1371 let def = find_container_definition(s, &task.family, task.revision, &container.name);
1372 let secrets_refs = def
1373 .as_ref()
1374 .and_then(|d| d.get("secrets").and_then(|v| v.as_array()).cloned())
1375 .map(|arr| {
1376 arr.iter()
1377 .filter_map(|e| {
1378 let name = e.get("name").and_then(|v| v.as_str())?.to_string();
1379 let value_from = e.get("valueFrom").and_then(|v| v.as_str())?.to_string();
1380 Some((name, value_from))
1381 })
1382 .collect::<Vec<_>>()
1383 })
1384 .unwrap_or_default();
1385 let str_array = |key: &str| -> Vec<String> {
1386 def.as_ref()
1387 .and_then(|d| d.get(key).and_then(|v| v.as_array()).cloned())
1388 .map(|arr| {
1389 arr.iter()
1390 .filter_map(|v| v.as_str().map(String::from))
1391 .collect::<Vec<_>>()
1392 })
1393 .unwrap_or_default()
1394 };
1395 let env = def
1396 .as_ref()
1397 .and_then(|d| d.get("environment").and_then(|v| v.as_array()).cloned())
1398 .map(|arr| {
1399 arr.iter()
1400 .filter_map(|e| {
1401 let k = e.get("name").and_then(|v| v.as_str())?;
1402 let v = e.get("value").and_then(|v| v.as_str()).unwrap_or("");
1403 Some((k.to_string(), v.to_string()))
1404 })
1405 .collect::<Vec<_>>()
1406 })
1407 .unwrap_or_default();
1408 let port_mappings = def
1409 .as_ref()
1410 .and_then(|d| d.get("portMappings").and_then(|v| v.as_array()).cloned())
1411 .map(|arr| {
1412 arr.iter()
1413 .filter_map(parse_port_mapping)
1414 .collect::<Vec<_>>()
1415 })
1416 .unwrap_or_default();
1417 let depends_on = def
1418 .as_ref()
1419 .and_then(|d| d.get("dependsOn").and_then(|v| v.as_array()).cloned())
1420 .map(|arr| {
1421 arr.iter()
1422 .filter_map(parse_depends_on_entry)
1423 .collect::<Vec<_>>()
1424 })
1425 .unwrap_or_default();
1426 let health_check = def
1427 .as_ref()
1428 .and_then(|d| d.get("healthCheck"))
1429 .and_then(parse_health_check);
1430 let volume_mounts = def
1431 .as_ref()
1432 .and_then(|d| d.get("mountPoints").and_then(|v| v.as_array()).cloned())
1433 .map(|arr| {
1434 arr.iter()
1435 .filter_map(|mp| resolve_mount_point(mp, &volumes_by_name))
1436 .collect::<Vec<_>>()
1437 })
1438 .unwrap_or_default();
1439 let ulimits = def
1440 .as_ref()
1441 .and_then(|d| d.get("ulimits").and_then(|v| v.as_array()).cloned())
1442 .map(|arr| arr.iter().filter_map(parse_ulimit).collect::<Vec<_>>())
1443 .unwrap_or_default();
1444 let linux_parameters = def
1445 .as_ref()
1446 .and_then(|d| d.get("linuxParameters"))
1447 .and_then(parse_linux_parameters);
1448 let stop_timeout = def.as_ref().and_then(|d| {
1449 d.get("stopTimeout")
1450 .and_then(|v| v.as_u64())
1451 .map(|n| n as u32)
1452 });
1453 let user = def
1454 .as_ref()
1455 .and_then(|d| d.get("user").and_then(|v| v.as_str()).map(String::from));
1456 let working_directory = def.as_ref().and_then(|d| {
1457 d.get("workingDirectory")
1458 .and_then(|v| v.as_str())
1459 .map(String::from)
1460 });
1461 let tty = def
1462 .as_ref()
1463 .and_then(|d| d.get("tty").and_then(|v| v.as_bool()))
1464 .unwrap_or(false);
1465 let interactive = def
1466 .as_ref()
1467 .and_then(|d| d.get("interactive").and_then(|v| v.as_bool()))
1468 .unwrap_or(false);
1469 let readonly_rootfs = def
1470 .as_ref()
1471 .and_then(|d| d.get("readonlyRootFilesystem").and_then(|v| v.as_bool()))
1472 .unwrap_or(false);
1473 plans.push(ContainerPlan {
1474 container_name: container.name.clone(),
1475 image: container.image.clone(),
1476 env,
1477 entry_point: str_array("entryPoint"),
1478 command: str_array("command"),
1479 secrets_refs,
1480 essential: container.essential,
1481 has_task_role,
1482 port_mappings,
1483 network_mode: network_mode.clone(),
1484 depends_on,
1485 health_check,
1486 volume_mounts,
1487 ulimits,
1488 linux_parameters,
1489 stop_timeout,
1490 user,
1491 working_directory,
1492 tty,
1493 interactive,
1494 readonly_rootfs,
1495 });
1496 }
1497 let plans = topo_sort_plans(plans);
1498 Ok(plans)
1499}
1500
1501fn resolve_mount_point(
1509 mount_point: &serde_json::Value,
1510 volumes_by_name: &std::collections::HashMap<String, &serde_json::Value>,
1511) -> Option<VolumeMount> {
1512 let container_path = mount_point
1513 .get("containerPath")
1514 .and_then(|v| v.as_str())?
1515 .to_string();
1516 let source_volume = mount_point.get("sourceVolume").and_then(|v| v.as_str())?;
1517 let read_only = mount_point
1518 .get("readOnly")
1519 .and_then(|v| v.as_bool())
1520 .unwrap_or(false);
1521 let volume = volumes_by_name.get(source_volume)?;
1522 let source = resolve_volume_source(source_volume, volume)?;
1523 Some(VolumeMount {
1524 source,
1525 container_path,
1526 read_only,
1527 })
1528}
1529
1530fn resolve_volume_source(name: &str, volume: &serde_json::Value) -> Option<String> {
1549 if let Some(host) = volume.get("host") {
1550 if let Some(path) = host.get("sourcePath").and_then(|v| v.as_str()) {
1551 if !path.is_empty() {
1554 ensure_dir_exists(path);
1555 return Some(path.to_string());
1556 }
1557 }
1558 }
1559 if let Some(efs) = volume.get("efsVolumeConfiguration") {
1560 let fs_id = efs.get("fileSystemId").and_then(|v| v.as_str())?;
1561 let root = efs
1562 .get("rootDirectory")
1563 .and_then(|v| v.as_str())
1564 .unwrap_or("/");
1565 let path = stub_dir_for("efs", fs_id, root);
1566 ensure_dir_exists(&path);
1567 return Some(path);
1568 }
1569 if let Some(fsx) = volume.get("fsxWindowsFileServerVolumeConfiguration") {
1570 let fs_id = fsx.get("fileSystemId").and_then(|v| v.as_str())?;
1571 let root = fsx
1572 .get("rootDirectory")
1573 .and_then(|v| v.as_str())
1574 .unwrap_or("/");
1575 let path = stub_dir_for("fsx", fs_id, root);
1576 ensure_dir_exists(&path);
1577 return Some(path);
1578 }
1579 if volume.get("dockerVolumeConfiguration").is_some() {
1580 return Some(name.to_string());
1583 }
1584 Some(name.to_string())
1586}
1587
1588fn stub_dir_for(kind: &str, fs_id: &str, root: &str) -> String {
1593 let trimmed = root.trim_start_matches('/');
1594 if trimmed.is_empty() {
1595 format!("/tmp/fakecloud/{kind}/{fs_id}")
1596 } else {
1597 format!("/tmp/fakecloud/{kind}/{fs_id}/{trimmed}")
1598 }
1599}
1600
1601fn ensure_dir_exists(path: &str) {
1606 let _ = std::fs::create_dir_all(path);
1607}
1608
1609fn parse_depends_on_entry(value: &serde_json::Value) -> Option<DependsOn> {
1614 let container_name = value
1615 .get("containerName")
1616 .and_then(|v| v.as_str())?
1617 .to_string();
1618 let raw_condition = value.get("condition").and_then(|v| v.as_str())?;
1619 let condition = DependsOnCondition::parse(raw_condition)?;
1620 Some(DependsOn {
1621 container_name,
1622 condition,
1623 })
1624}
1625
1626fn topo_sort_plans(plans: Vec<ContainerPlan>) -> Vec<ContainerPlan> {
1637 use std::collections::{HashMap, HashSet};
1638 let names: HashSet<String> = plans.iter().map(|p| p.container_name.clone()).collect();
1639 let index: HashMap<String, usize> = plans
1640 .iter()
1641 .enumerate()
1642 .map(|(i, p)| (p.container_name.clone(), i))
1643 .collect();
1644 let mut in_degree: Vec<usize> = plans
1649 .iter()
1650 .map(|p| {
1651 p.depends_on
1652 .iter()
1653 .filter(|d| names.contains(&d.container_name))
1654 .count()
1655 })
1656 .collect();
1657 let mut dependants: Vec<Vec<usize>> = vec![Vec::new(); plans.len()];
1659 for (i, p) in plans.iter().enumerate() {
1660 for d in &p.depends_on {
1661 if let Some(&di) = index.get(&d.container_name) {
1662 dependants[di].push(i);
1663 }
1664 }
1665 }
1666 let mut ordered: Vec<ContainerPlan> = Vec::with_capacity(plans.len());
1667 let mut emitted: Vec<bool> = vec![false; plans.len()];
1668 loop {
1669 let next = (0..plans.len()).find(|&i| !emitted[i] && in_degree[i] == 0);
1672 match next {
1673 Some(i) => {
1674 emitted[i] = true;
1675 ordered.push(plans[i].clone());
1676 for &di in &dependants[i] {
1677 if in_degree[di] > 0 {
1678 in_degree[di] -= 1;
1679 }
1680 }
1681 }
1682 None => break,
1683 }
1684 }
1685 for (i, p) in plans.into_iter().enumerate() {
1687 if !emitted[i] {
1688 ordered.push(p);
1689 }
1690 }
1691 ordered
1692}
1693
1694pub(crate) fn find_depends_on_cycle(
1703 container_definitions: &[serde_json::Value],
1704) -> Option<(String, String)> {
1705 use std::collections::HashMap;
1706
1707 let names: Vec<String> = container_definitions
1708 .iter()
1709 .filter_map(|c| c.get("name").and_then(|n| n.as_str()).map(String::from))
1710 .collect();
1711 let index: HashMap<&str, usize> = names
1712 .iter()
1713 .enumerate()
1714 .map(|(i, n)| (n.as_str(), i))
1715 .collect();
1716
1717 let mut adj: Vec<Vec<usize>> = vec![Vec::new(); names.len()];
1718 for (i, cd) in container_definitions.iter().enumerate() {
1719 if i >= names.len() {
1720 continue;
1721 }
1722 let Some(deps) = cd.get("dependsOn").and_then(|v| v.as_array()) else {
1723 continue;
1724 };
1725 for d in deps {
1726 let Some(target) = d.get("containerName").and_then(|v| v.as_str()) else {
1727 continue;
1728 };
1729 if let Some(&j) = index.get(target) {
1730 adj[i].push(j);
1732 }
1733 }
1734 }
1735
1736 let mut state = vec![0u8; names.len()];
1740 let mut stack: Vec<(usize, usize)> = Vec::new();
1741 for start in 0..names.len() {
1742 if state[start] != 0 {
1743 continue;
1744 }
1745 stack.clear();
1746 stack.push((start, 0));
1747 state[start] = 1;
1748 while let Some(&(node, next_edge)) = stack.last() {
1749 if next_edge < adj[node].len() {
1750 let nb = adj[node][next_edge];
1751 stack.last_mut().unwrap().1 += 1;
1752 match state[nb] {
1753 0 => {
1754 state[nb] = 1;
1755 stack.push((nb, 0));
1756 }
1757 1 => {
1758 return Some((names[node].clone(), names[nb].clone()));
1759 }
1760 _ => {}
1761 }
1762 } else {
1763 state[node] = 2;
1764 stack.pop();
1765 }
1766 }
1767 }
1768 None
1769}
1770
1771#[derive(Debug, Clone)]
1775struct InspectedState {
1776 started: bool,
1777 exited: bool,
1778 exit_code: i64,
1779 health: Option<String>,
1780}
1781
1782async fn inspect_container_state(cli: &str, container_id: &str) -> Option<InspectedState> {
1786 let format =
1789 "{{.State.Status}}|{{.State.Running}}|{{.State.ExitCode}}|{{if .State.Health}}{{.State.Health.Status}}{{else}}<none>{{end}}";
1790 let out = Command::new(cli)
1791 .args(["inspect", "-f", format, container_id])
1792 .output()
1793 .await
1794 .ok()?;
1795 if !out.status.success() {
1796 return None;
1797 }
1798 let raw = String::from_utf8_lossy(&out.stdout).trim().to_string();
1799 let parts: Vec<&str> = raw.split('|').collect();
1800 if parts.len() < 4 {
1801 return None;
1802 }
1803 let status = parts[0];
1804 let running = parts[1] == "true";
1805 let exit_code: i64 = parts[2].parse().unwrap_or(-1);
1806 let health = match parts[3] {
1807 "<none>" | "" => None,
1808 other => Some(other.to_string()),
1809 };
1810 let started = running || status == "exited" || status == "running" || status == "dead";
1814 let exited = status == "exited" || status == "dead";
1815 Some(InspectedState {
1816 started,
1817 exited,
1818 exit_code,
1819 health,
1820 })
1821}
1822
1823fn condition_is_met(condition: DependsOnCondition, state: &InspectedState) -> bool {
1827 match condition {
1828 DependsOnCondition::Start => state.started,
1829 DependsOnCondition::Complete => state.exited,
1830 DependsOnCondition::Success => state.exited && state.exit_code == 0,
1831 DependsOnCondition::Healthy => state.health.as_deref() == Some("healthy"),
1832 }
1833}
1834
1835#[cfg(test)]
1839pub(crate) fn __test_parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
1840 parse_port_mapping(value)
1841}
1842
1843fn parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
1849 let cmd_arr = value.get("command")?.as_array()?;
1850 let command: Vec<String> = cmd_arr
1851 .iter()
1852 .filter_map(|v| v.as_str().map(String::from))
1853 .collect();
1854 if command.is_empty() {
1855 return None;
1856 }
1857 if command.first().map(|s| s.as_str()) == Some("NONE") {
1858 return None;
1859 }
1860 let read_u32 = |key: &str, default: u32| -> u32 {
1861 value
1862 .get(key)
1863 .and_then(|v| v.as_i64())
1864 .filter(|n| (0..=u32::MAX as i64).contains(n))
1865 .map(|n| n as u32)
1866 .unwrap_or(default)
1867 };
1868 Some(HealthCheckSpec {
1869 command,
1870 interval_seconds: read_u32("interval", 30),
1871 timeout_seconds: read_u32("timeout", 5),
1872 retries: read_u32("retries", 3),
1873 start_period_seconds: read_u32("startPeriod", 0),
1874 })
1875}
1876
1877fn parse_ulimit(value: &serde_json::Value) -> Option<Ulimit> {
1879 let name = value.get("name").and_then(|v| v.as_str())?;
1880 let soft = value
1881 .get("softLimit")
1882 .and_then(|v| v.as_i64())
1883 .filter(|n| *n >= 0)? as i32;
1884 let hard = value
1885 .get("hardLimit")
1886 .and_then(|v| v.as_i64())
1887 .filter(|n| *n >= 0)? as i32;
1888 Some(Ulimit {
1889 name: name.to_string(),
1890 soft_limit: soft,
1891 hard_limit: hard,
1892 })
1893}
1894
1895fn parse_linux_parameters(value: &serde_json::Value) -> Option<LinuxParameters> {
1897 let mut lp = LinuxParameters::default();
1898 if let Some(arr) = value
1899 .get("capabilities")
1900 .and_then(|v| v.get("add"))
1901 .and_then(|v| v.as_array())
1902 {
1903 lp.capabilities_add = arr
1904 .iter()
1905 .filter_map(|v| v.as_str().map(String::from))
1906 .collect();
1907 }
1908 if let Some(arr) = value
1909 .get("capabilities")
1910 .and_then(|v| v.get("drop"))
1911 .and_then(|v| v.as_array())
1912 {
1913 lp.capabilities_drop = arr
1914 .iter()
1915 .filter_map(|v| v.as_str().map(String::from))
1916 .collect();
1917 }
1918 if let Some(arr) = value.get("devices").and_then(|v| v.as_array()) {
1919 lp.devices = arr.iter().filter_map(parse_device).collect();
1920 }
1921 lp.init_process_enabled = value
1922 .get("initProcessEnabled")
1923 .and_then(|v| v.as_bool())
1924 .unwrap_or(false);
1925 lp.shared_memory_size = value
1926 .get("sharedMemorySize")
1927 .and_then(|v| v.as_i64())
1928 .map(|n| n as i32);
1929 if let Some(arr) = value.get("sysctl").and_then(|v| v.as_array()) {
1930 lp.sysctls = arr.iter().filter_map(parse_sysctl).collect();
1931 }
1932 if let Some(arr) = value.get("tmpfs").and_then(|v| v.as_array()) {
1933 lp.tmpfs = arr.iter().filter_map(parse_tmpfs).collect();
1934 }
1935 lp.privileged = value
1936 .get("privileged")
1937 .and_then(|v| v.as_bool())
1938 .unwrap_or(false);
1939 Some(lp)
1940}
1941
1942fn parse_device(value: &serde_json::Value) -> Option<Device> {
1943 let host_path = value.get("hostPath").and_then(|v| v.as_str())?.to_string();
1944 let container_path = value
1945 .get("containerPath")
1946 .and_then(|v| v.as_str())?
1947 .to_string();
1948 let permissions = value
1949 .get("permissions")
1950 .and_then(|v| v.as_str())
1951 .unwrap_or("rwm")
1952 .to_string();
1953 Some(Device {
1954 host_path,
1955 container_path,
1956 permissions,
1957 })
1958}
1959
1960fn parse_sysctl(value: &serde_json::Value) -> Option<Sysctl> {
1961 let name = value.get("name").and_then(|v| v.as_str())?.to_string();
1962 let value_str = value.get("value").and_then(|v| v.as_str())?.to_string();
1963 Some(Sysctl {
1964 name,
1965 value: value_str,
1966 })
1967}
1968
1969fn parse_tmpfs(value: &serde_json::Value) -> Option<Tmpfs> {
1970 let container_path = value
1971 .get("containerPath")
1972 .and_then(|v| v.as_str())?
1973 .to_string();
1974 let size = value
1975 .get("size")
1976 .and_then(|v| v.as_i64())
1977 .filter(|n| *n > 0)? as i32;
1978 let mount_options = value
1979 .get("mountOptions")
1980 .and_then(|v| v.as_array())
1981 .map(|arr| {
1982 arr.iter()
1983 .filter_map(|v| v.as_str().map(String::from))
1984 .collect()
1985 })
1986 .unwrap_or_default();
1987 Some(Tmpfs {
1988 container_path,
1989 size,
1990 mount_options,
1991 })
1992}
1993
1994pub(crate) fn render_health_flags(hc: &HealthCheckSpec) -> Vec<String> {
2001 if hc.command.len() < 2 {
2002 return Vec::new();
2003 }
2004 let cmd_kind = hc.command[0].as_str();
2005 if cmd_kind != "CMD" && cmd_kind != "CMD-SHELL" {
2006 return Vec::new();
2007 }
2008 let cmd_string = hc.command[1..].join(" ");
2009 vec![
2010 "--health-cmd".into(),
2011 cmd_string,
2012 format!("--health-interval={}s", hc.interval_seconds),
2013 format!("--health-timeout={}s", hc.timeout_seconds),
2014 format!("--health-retries={}", hc.retries),
2015 format!("--health-start-period={}s", hc.start_period_seconds),
2016 ]
2017}
2018
2019#[cfg(test)]
2023pub(crate) fn __test_parse_health_check(value: &serde_json::Value) -> Option<HealthCheckSpec> {
2024 parse_health_check(value)
2025}
2026
2027pub(crate) fn docker_health_to_ecs(raw: &str) -> &'static str {
2033 match raw.trim().to_ascii_lowercase().as_str() {
2034 "healthy" => "HEALTHY",
2035 "unhealthy" => "UNHEALTHY",
2036 _ => "UNKNOWN",
2037 }
2038}
2039
2040fn parse_port_mapping(value: &serde_json::Value) -> Option<PortMapping> {
2044 let container_port = value
2045 .get("containerPort")
2046 .and_then(|v| v.as_i64())
2047 .filter(|n| (0..=u16::MAX as i64).contains(n))? as u16;
2048 let host_port_raw = value
2049 .get("hostPort")
2050 .and_then(|v| v.as_i64())
2051 .filter(|n| (0..=u16::MAX as i64).contains(n))
2052 .map(|n| n as u16)
2053 .unwrap_or(0);
2054 let host_port = if host_port_raw == 0 {
2055 container_port
2056 } else {
2057 host_port_raw
2058 };
2059 let protocol = value
2060 .get("protocol")
2061 .and_then(|v| v.as_str())
2062 .map(|s| s.to_ascii_lowercase())
2063 .unwrap_or_else(|| "tcp".to_string());
2064 Some(PortMapping {
2065 container_port,
2066 host_port,
2067 protocol,
2068 })
2069}
2070
2071pub(crate) fn build_run_argv(
2077 plan: &ContainerPlan,
2078 env: &[(String, String)],
2079 task_id: &str,
2080 host_ip: &str,
2081 run_image: &str,
2082) -> Vec<String> {
2083 let mut argv: Vec<String> = Vec::new();
2084 argv.push("run".into());
2085 argv.push("-d".into());
2086 argv.push("--name".into());
2087 argv.push(format!("{}-{}", task_id, plan.container_name));
2088 argv.push("--label".into());
2089 argv.push(format!("fakecloud-ecs-task={}", task_id));
2090 argv.push("--label".into());
2091 argv.push(format!("fakecloud-ecs-container={}", plan.container_name));
2092 argv.push("--add-host".into());
2093 argv.push(format!("host.docker.internal:{}", host_ip));
2094 if plan.network_mode.as_deref() == Some("awsvpc") {
2095 argv.push("--network".into());
2096 argv.push(format!("fakecloud-ecs-{}", task_id));
2097 }
2098 let publish_ports = plan.network_mode.as_deref() != Some("awsvpc");
2102 if publish_ports {
2103 for pm in &plan.port_mappings {
2104 argv.push("--publish".into());
2105 argv.push(format!(
2106 "{}:{}/{}",
2107 pm.container_port, pm.host_port, pm.protocol
2108 ));
2109 }
2110 }
2111 if let Some(ref hc) = plan.health_check {
2112 argv.extend(render_health_flags(hc));
2113 }
2114 for (k, v) in env {
2115 let transformed = v
2116 .replace("http://127.0.0.1:", "http://host.docker.internal:")
2117 .replace("https://127.0.0.1:", "https://host.docker.internal:")
2118 .replace("http://localhost:", "http://host.docker.internal:")
2119 .replace("https://localhost:", "https://host.docker.internal:");
2120 argv.push("-e".into());
2121 argv.push(format!("{}={}", k, transformed));
2122 }
2123 for vm in &plan.volume_mounts {
2128 argv.push("-v".into());
2129 let suffix = if vm.read_only { ":ro" } else { "" };
2130 argv.push(format!("{}:{}{}", vm.source, vm.container_path, suffix));
2131 }
2132 for ul in &plan.ulimits {
2133 argv.push("--ulimit".into());
2134 argv.push(format!("{}={}:{}", ul.name, ul.soft_limit, ul.hard_limit));
2135 }
2136 if let Some(ref lp) = plan.linux_parameters {
2137 for cap in &lp.capabilities_add {
2138 argv.push("--cap-add".into());
2139 argv.push(cap.clone());
2140 }
2141 for cap in &lp.capabilities_drop {
2142 argv.push("--cap-drop".into());
2143 argv.push(cap.clone());
2144 }
2145 for dev in &lp.devices {
2146 argv.push("--device".into());
2147 argv.push(format!(
2148 "{}:{}{}",
2149 dev.host_path, dev.container_path, dev.permissions
2150 ));
2151 }
2152 if lp.init_process_enabled {
2153 argv.push("--init".into());
2154 }
2155 if let Some(size) = lp.shared_memory_size {
2156 argv.push("--shm-size".into());
2157 argv.push(format!("{}m", size));
2158 }
2159 for sys in &lp.sysctls {
2160 argv.push("--sysctl".into());
2161 argv.push(format!("{}={}", sys.name, sys.value));
2162 }
2163 for tmp in &lp.tmpfs {
2164 let mut opts = tmp.mount_options.join(",");
2165 if !opts.is_empty() {
2166 opts = format!(",{}", opts);
2167 }
2168 argv.push("--tmpfs".into());
2169 argv.push(format!("{}:size={}M{}", tmp.container_path, tmp.size, opts));
2170 }
2171 if lp.privileged {
2172 argv.push("--privileged".into());
2173 }
2174 }
2175 if let Some(timeout) = plan.stop_timeout {
2176 argv.push("--stop-timeout".into());
2177 argv.push(format!("{}", timeout));
2178 }
2179 if let Some(ref user) = plan.user {
2180 argv.push("--user".into());
2181 argv.push(user.clone());
2182 }
2183 if let Some(ref wd) = plan.working_directory {
2184 argv.push("--workdir".into());
2185 argv.push(wd.clone());
2186 }
2187 if plan.tty {
2188 argv.push("--tty".into());
2189 }
2190 if plan.interactive {
2191 argv.push("--interactive".into());
2192 }
2193 if plan.readonly_rootfs {
2194 argv.push("--read-only".into());
2195 }
2196 if let Some(first) = plan.entry_point.first() {
2197 argv.push("--entrypoint".into());
2198 argv.push(first.clone());
2199 }
2200 argv.push(run_image.to_string());
2201 for arg in plan.entry_point.iter().skip(1) {
2202 argv.push(arg.clone());
2203 }
2204 for arg in &plan.command {
2205 argv.push(arg.clone());
2206 }
2207 argv
2208}
2209
2210pub(crate) fn network_bindings_for(plan: &ContainerPlan) -> Vec<serde_json::Value> {
2214 if plan.network_mode.as_deref() == Some("awsvpc") {
2215 return Vec::new();
2216 }
2217 plan.port_mappings
2218 .iter()
2219 .map(|pm| {
2220 serde_json::json!({
2221 "bindIP": "0.0.0.0",
2222 "containerPort": pm.container_port,
2223 "hostPort": pm.host_port,
2224 "protocol": pm.protocol,
2225 })
2226 })
2227 .collect()
2228}
2229
2230#[allow(clippy::type_complexity)]
2234pub(crate) fn compute_elbv2_targets(
2235 ecs_state: &crate::state::EcsState,
2236 task: &crate::state::Task,
2237) -> Vec<(String, Vec<(String, Option<i64>)>)> {
2238 let mut result = Vec::new();
2239 let Some(group) = task.group.as_deref() else {
2240 return result;
2241 };
2242 let service_name = group.strip_prefix("service:").unwrap_or(group);
2243 let key = crate::state::EcsState::service_key(&task.cluster_name, service_name);
2244 let Some(service) = ecs_state.services.get(&key) else {
2245 return result;
2246 };
2247
2248 let network_mode = ecs_state
2249 .task_definitions
2250 .get(&task.family)
2251 .and_then(|revs| revs.get(&task.revision))
2252 .and_then(|td| td.network_mode.as_deref());
2253
2254 for lb in &service.load_balancers {
2255 let tg_arn = lb.get("targetGroupArn").and_then(|v| v.as_str());
2256 let container_name = lb.get("containerName").and_then(|v| v.as_str());
2257 let container_port = lb.get("containerPort").and_then(|v| v.as_i64());
2258 let Some(tg_arn) = tg_arn else { continue };
2259 let Some(container_name) = container_name else {
2260 continue;
2261 };
2262
2263 let target_id = if network_mode == Some("awsvpc") {
2264 task.attachments
2265 .iter()
2266 .find(|a| a.attachment_type == "eni")
2267 .and_then(|eni| {
2268 eni.details
2269 .iter()
2270 .find(|d| d.name == "privateIPv4Address")
2271 .map(|d| d.value.clone())
2272 })
2273 } else {
2274 Some("127.0.0.1".to_string())
2275 };
2276
2277 let port = if network_mode == Some("awsvpc") {
2278 container_port
2279 } else {
2280 task.containers
2281 .iter()
2282 .find(|c| c.name == container_name)
2283 .and_then(|c| {
2284 c.network_bindings
2285 .iter()
2286 .find(|nb| {
2287 nb.get("containerPort").and_then(|v| v.as_i64()) == container_port
2288 })
2289 .and_then(|nb| nb.get("hostPort").and_then(|v| v.as_i64()))
2290 })
2291 };
2292
2293 if let Some(id) = target_id {
2294 if let Some(entry) = result.iter_mut().find(|(arn, _)| arn == tg_arn) {
2295 entry.1.push((id, port));
2296 } else {
2297 result.push((tg_arn.to_string(), vec![(id, port)]));
2298 }
2299 }
2300 }
2301 result
2302}
2303
2304struct TaskSnapshot {
2305 task_arn: String,
2306 cluster_arn: String,
2307 launch_type: String,
2308 group: Option<String>,
2309 task_definition_arn: String,
2310 containers: serde_json::Value,
2311}
2312
2313fn snapshot_task(state: &SharedEcsState, account_id: &str, task_id: &str) -> Option<TaskSnapshot> {
2314 let accounts = state.read();
2315 let s = accounts.get(account_id)?;
2316 let task = s.tasks.get(task_id)?;
2317 Some(TaskSnapshot {
2318 task_arn: task.task_arn.clone(),
2319 cluster_arn: task.cluster_arn.clone(),
2320 launch_type: task.launch_type.clone(),
2321 group: task.group.clone(),
2322 task_definition_arn: task.task_definition_arn.clone(),
2323 containers: serde_json::Value::Array(
2324 task.containers
2325 .iter()
2326 .map(|c| {
2327 serde_json::json!({
2328 "containerArn": c.container_arn,
2329 "name": c.name,
2330 "image": c.image,
2331 "lastStatus": c.last_status,
2332 "exitCode": c.exit_code,
2333 "reason": c.reason,
2334 })
2335 })
2336 .collect(),
2337 ),
2338 })
2339}
2340
2341fn cli_works(cli: &str) -> bool {
2342 std::process::Command::new(cli)
2343 .arg("info")
2344 .stdout(std::process::Stdio::null())
2345 .stderr(std::process::Stdio::null())
2346 .status()
2347 .map(|s| s.success())
2348 .unwrap_or(false)
2349}
2350
2351fn build_local_registry_docker_config(server_port: u16) -> Option<TempDir> {
2356 let dir = TempDir::new().ok()?;
2357 let auth = base64::engine::general_purpose::STANDARD.encode("AWS:fakecloud-ecs-runtime");
2358 let config = serde_json::json!({
2359 "auths": {
2360 format!("127.0.0.1:{server_port}"): { "auth": auth },
2361 }
2362 });
2363 std::fs::write(dir.path().join("config.json"), config.to_string()).ok()?;
2364 Some(dir)
2365}
2366
2367fn find_container_definition(
2368 state: &crate::state::EcsState,
2369 family: &str,
2370 revision: i32,
2371 name: &str,
2372) -> Option<serde_json::Value> {
2373 state
2374 .task_definitions
2375 .get(family)?
2376 .get(&revision)?
2377 .container_definitions
2378 .iter()
2379 .find(|c| c.get("name").and_then(|v| v.as_str()) == Some(name))
2380 .cloned()
2381}
2382
2383fn mark_pull_started(state: &SharedEcsState, account_id: &str, task_id: &str) {
2384 let mut accounts = state.write();
2385 let Some(s) = accounts.get_mut(account_id) else {
2386 return;
2387 };
2388 let task_arn_cluster = s
2389 .tasks
2390 .get(task_id)
2391 .map(|t| (t.task_arn.clone(), t.cluster_arn.clone()));
2392 if let Some(task) = s.tasks.get_mut(task_id) {
2393 task.pull_started_at = Some(Utc::now());
2394 }
2395 if let Some((arn, cluster_arn)) = task_arn_cluster {
2396 s.push_event(LifecycleEvent {
2397 at: Utc::now(),
2398 event_type: "PullStarted".into(),
2399 task_arn: Some(arn),
2400 cluster_arn: Some(cluster_arn),
2401 last_status: Some("PENDING".into()),
2402 detail: serde_json::json!({}),
2403 });
2404 }
2405}
2406
2407fn mark_pull_stopped(state: &SharedEcsState, account_id: &str, task_id: &str) {
2408 let mut accounts = state.write();
2409 let Some(s) = accounts.get_mut(account_id) else {
2410 return;
2411 };
2412 if let Some(task) = s.tasks.get_mut(task_id) {
2413 task.pull_stopped_at = Some(Utc::now());
2414 }
2415}
2416
2417pub(crate) fn mark_running_multi(
2418 state: &SharedEcsState,
2419 account_id: &str,
2420 task_id: &str,
2421 started: &[RunningContainer],
2422) {
2423 let mut accounts = state.write();
2424 let Some(s) = accounts.get_mut(account_id) else {
2425 return;
2426 };
2427 let (arn, cluster_arn) = {
2428 let Some(task) = s.tasks.get_mut(task_id) else {
2429 return;
2430 };
2431 task.last_status = "RUNNING".into();
2432 task.connectivity = "CONNECTED".into();
2433 task.connectivity_at = Some(Utc::now());
2434 task.started_at = Some(Utc::now());
2435 for rc in started {
2436 if let Some(c) = task.containers.iter_mut().find(|c| c.name == rc.name) {
2437 c.runtime_id = Some(rc.container_id.clone());
2438 c.last_status = "RUNNING".into();
2439 c.network_bindings = rc.network_bindings.clone();
2440 if rc.image_digest.is_some() {
2441 c.image_digest = rc.image_digest.clone();
2442 }
2443 }
2444 }
2445 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2446 cluster.running_tasks_count += 1;
2447 if cluster.pending_tasks_count > 0 {
2448 cluster.pending_tasks_count -= 1;
2449 }
2450 }
2451 if let Some(ref ci_arn) = task.container_instance_arn {
2452 if let Some(ci) = s
2453 .container_instances
2454 .values_mut()
2455 .find(|ci| ci.container_instance_arn == *ci_arn)
2456 {
2457 ci.running_tasks_count += 1;
2458 if ci.pending_tasks_count > 0 {
2459 ci.pending_tasks_count -= 1;
2460 }
2461 }
2462 }
2463 (task.task_arn.clone(), task.cluster_arn.clone())
2464 };
2465 s.push_event(LifecycleEvent {
2466 at: Utc::now(),
2467 event_type: "TaskStateChange".into(),
2468 task_arn: Some(arn),
2469 cluster_arn: Some(cluster_arn),
2470 last_status: Some("RUNNING".into()),
2471 detail: serde_json::json!({}),
2472 });
2473}
2474
2475#[allow(clippy::too_many_arguments)]
2476fn finalize_stopped_multi(
2477 state: &SharedEcsState,
2478 account_id: &str,
2479 task_id: &str,
2480 final_containers: &[RunningContainer],
2481 primary_exit_code: i64,
2482 captured: &str,
2483 stop_code: &str,
2484 stopped_reason: Option<String>,
2485) {
2486 let mut accounts = state.write();
2487 let Some(s) = accounts.get_mut(account_id) else {
2488 return;
2489 };
2490 let (arn, cluster_arn) = {
2491 let Some(task) = s.tasks.get_mut(task_id) else {
2492 return;
2493 };
2494 task.last_status = "STOPPED".into();
2495 task.desired_status = "STOPPED".into();
2496 task.stopping_at = task.stopping_at.or(Some(Utc::now()));
2497 task.stopped_at = Some(Utc::now());
2498 task.stop_code = Some(stop_code.into());
2499 task.stopped_reason = stopped_reason.or(Some(format!("Exit code {}", primary_exit_code)));
2500 task.captured_logs = captured.to_string();
2501 for c in task.containers.iter_mut() {
2502 c.last_status = "STOPPED".into();
2503 if c.exit_code.is_none() {
2504 let mapped = final_containers
2505 .iter()
2506 .find(|r| r.name == c.name)
2507 .and_then(|r| r.exit_code);
2508 c.exit_code = mapped.or(Some(primary_exit_code));
2509 }
2510 }
2511 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2512 if cluster.running_tasks_count > 0 {
2513 cluster.running_tasks_count -= 1;
2514 }
2515 }
2516 if let Some(ref ci_arn) = task.container_instance_arn {
2517 if let Some(ci) = s
2518 .container_instances
2519 .values_mut()
2520 .find(|ci| ci.container_instance_arn == *ci_arn)
2521 {
2522 if ci.running_tasks_count > 0 {
2523 ci.running_tasks_count -= 1;
2524 }
2525 }
2526 }
2527 (task.task_arn.clone(), task.cluster_arn.clone())
2528 };
2529 s.push_event(LifecycleEvent {
2530 at: Utc::now(),
2531 event_type: "TaskStateChange".into(),
2532 task_arn: Some(arn),
2533 cluster_arn: Some(cluster_arn),
2534 last_status: Some("STOPPED".into()),
2535 detail: serde_json::json!({
2536 "exitCode": primary_exit_code,
2537 "stopCode": stop_code,
2538 }),
2539 });
2540}
2541
2542fn finalize_failure(state: &SharedEcsState, account_id: &str, task_id: &str, reason: &str) {
2543 let mut accounts = state.write();
2544 let Some(s) = accounts.get_mut(account_id) else {
2545 return;
2546 };
2547 let (arn, cluster_arn) = {
2548 let Some(task) = s.tasks.get_mut(task_id) else {
2549 return;
2550 };
2551 let was_running = task.last_status == "RUNNING";
2557 task.last_status = "STOPPED".into();
2558 task.desired_status = "STOPPED".into();
2559 task.stopped_at = Some(Utc::now());
2560 task.stop_code = Some("TaskFailedToStart".into());
2561 task.stopped_reason = Some(reason.to_string());
2562 task.captured_logs = format!("[task failed to start]: {reason}");
2566 for c in task.containers.iter_mut() {
2567 c.last_status = "STOPPED".into();
2568 c.reason = Some(reason.to_string());
2569 }
2570 if let Some(cluster) = s.clusters.get_mut(&task.cluster_name) {
2571 if was_running {
2572 if cluster.running_tasks_count > 0 {
2573 cluster.running_tasks_count -= 1;
2574 }
2575 } else if cluster.pending_tasks_count > 0 {
2576 cluster.pending_tasks_count -= 1;
2577 }
2578 }
2579 if let Some(ref ci_arn) = task.container_instance_arn {
2580 if let Some(ci) = s
2581 .container_instances
2582 .values_mut()
2583 .find(|ci| ci.container_instance_arn == *ci_arn)
2584 {
2585 if was_running {
2586 if ci.running_tasks_count > 0 {
2587 ci.running_tasks_count -= 1;
2588 }
2589 } else if ci.pending_tasks_count > 0 {
2590 ci.pending_tasks_count -= 1;
2591 }
2592 }
2593 }
2594 (task.task_arn.clone(), task.cluster_arn.clone())
2595 };
2596 s.push_event(LifecycleEvent {
2597 at: Utc::now(),
2598 event_type: "TaskFailedToStart".into(),
2599 task_arn: Some(arn),
2600 cluster_arn: Some(cluster_arn),
2601 last_status: Some("STOPPED".into()),
2602 detail: serde_json::json!({ "reason": reason }),
2603 });
2604}
2605
2606pub async fn sleep(duration: Duration) {
2610 tokio::time::sleep(duration).await;
2611}
2612
2613#[cfg(test)]
2614mod tests {
2615 use super::*;
2616 use crate::state::{EcsState, Task};
2617 use fakecloud_aws::arn::Arn;
2618 use fakecloud_core::multi_account::MultiAccountState;
2619 use parking_lot::RwLock;
2620 use std::sync::Arc;
2621
2622 #[test]
2623 fn cli_works_for_known_missing_binary_is_false() {
2624 assert!(!cli_works("definitely-not-a-real-cli-binary-xyz"));
2625 }
2626
2627 #[test]
2628 fn aws_ecr_uris_translate_for_local_pull() {
2629 assert_eq!(
2630 fakecloud_core::ecr_uri::translate_to_local(
2631 "123456789012.dkr.ecr.us-east-1.amazonaws.com/app:latest",
2632 4566
2633 )
2634 .as_deref(),
2635 Some("127.0.0.1:4566/app:latest")
2636 );
2637 }
2638
2639 fn make_task(task_id: &str) -> Task {
2640 Task {
2641 task_arn: Arn::new(
2642 "ecs",
2643 "us-east-1",
2644 "000000000000",
2645 &format!("task/default/{task_id}"),
2646 )
2647 .to_string(),
2648 task_id: task_id.into(),
2649 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
2650 cluster_name: "default".into(),
2651 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
2652 family: "app".into(),
2653 revision: 1,
2654 container_instance_arn: None,
2655 capacity_provider_name: None,
2656 last_status: "PENDING".into(),
2657 desired_status: "RUNNING".into(),
2658 launch_type: "FARGATE".into(),
2659 platform_version: None,
2660 cpu: None,
2661 memory: None,
2662 containers: Vec::new(),
2663 overrides: serde_json::json!({}),
2664 started_by: None,
2665 group: None,
2666 connectivity: "CONNECTING".into(),
2667 stop_code: None,
2668 stopped_reason: None,
2669 created_at: Utc::now(),
2670 started_at: None,
2671 stopping_at: None,
2672 stopped_at: None,
2673 pull_started_at: None,
2674 pull_stopped_at: None,
2675 connectivity_at: None,
2676 started_by_ref_id: None,
2677 execution_role_arn: None,
2678 task_role_arn: None,
2679 tags: Vec::new(),
2680 awslogs: None,
2681 captured_logs: String::new(),
2682 protection: None,
2683 enable_execute_command: false,
2684 attachments: Vec::new(),
2685 volume_configurations: Vec::new(),
2686 task_set_arn: None,
2687 }
2688 }
2689
2690 #[test]
2691 fn finalize_failure_writes_reason_into_captured_logs() {
2692 let mut accounts: MultiAccountState<EcsState> =
2693 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2694 let acct = accounts.get_or_create("000000000000");
2695 acct.tasks.insert("t1".into(), make_task("t1"));
2696 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2697
2698 finalize_failure(
2699 &state,
2700 "000000000000",
2701 "t1",
2702 "failed to resolve secret DB_PASSWORD",
2703 );
2704
2705 let accounts = state.read();
2706 let task = accounts
2707 .get("000000000000")
2708 .unwrap()
2709 .tasks
2710 .get("t1")
2711 .unwrap();
2712 assert_eq!(task.last_status, "STOPPED");
2713 assert_eq!(task.stop_code.as_deref(), Some("TaskFailedToStart"));
2714 assert!(
2715 task.captured_logs
2716 .contains("failed to resolve secret DB_PASSWORD"),
2717 "captured_logs missing reason: {:?}",
2718 task.captured_logs
2719 );
2720 assert!(
2721 task.captured_logs.starts_with("[task failed to start]:"),
2722 "captured_logs missing prefix: {:?}",
2723 task.captured_logs
2724 );
2725 }
2726
2727 fn make_container(name: &str, essential: bool) -> crate::state::Container {
2728 crate::state::Container {
2729 container_arn: format!(
2730 "arn:aws:ecs:us-east-1:000000000000:container/default/abc/{name}"
2731 ),
2732 name: name.into(),
2733 image: "alpine".into(),
2734 task_arn: "arn:aws:ecs:us-east-1:000000000000:task/default/abc".into(),
2735 last_status: "RUNNING".into(),
2736 exit_code: None,
2737 reason: None,
2738 runtime_id: Some(format!("dockerid-{name}")),
2739 essential,
2740 cpu: None,
2741 memory: None,
2742 memory_reservation: None,
2743 network_bindings: Vec::new(),
2744 network_interfaces: Vec::new(),
2745 health_status: None,
2746 managed_agents: None,
2747 image_digest: None,
2748 }
2749 }
2750
2751 #[test]
2752 fn task_should_stop_when_essential_exits() {
2753 let containers = vec![
2754 RunningContainer {
2755 name: "app".into(),
2756 container_id: "id-app".into(),
2757 essential: true,
2758 exit_code: Some(0),
2759 network_bindings: Vec::new(),
2760 image_digest: None,
2761 },
2762 RunningContainer {
2763 name: "sidecar".into(),
2764 container_id: "id-sc".into(),
2765 essential: false,
2766 exit_code: None,
2767 network_bindings: Vec::new(),
2768 image_digest: None,
2769 },
2770 ];
2771 assert!(task_should_stop(&containers));
2772 }
2773
2774 #[test]
2775 fn task_keeps_running_when_only_non_essential_exits() {
2776 let containers = vec![
2777 RunningContainer {
2778 name: "app".into(),
2779 container_id: "id-app".into(),
2780 essential: true,
2781 exit_code: None,
2782 network_bindings: Vec::new(),
2783 image_digest: None,
2784 },
2785 RunningContainer {
2786 name: "sidecar".into(),
2787 container_id: "id-sc".into(),
2788 essential: false,
2789 exit_code: Some(0),
2790 network_bindings: Vec::new(),
2791 image_digest: None,
2792 },
2793 ];
2794 assert!(!task_should_stop(&containers));
2795 }
2796
2797 #[test]
2798 fn task_stops_when_all_non_essentials_exit() {
2799 let containers = vec![
2800 RunningContainer {
2801 name: "a".into(),
2802 container_id: "id-a".into(),
2803 essential: false,
2804 exit_code: Some(0),
2805 network_bindings: Vec::new(),
2806 image_digest: None,
2807 },
2808 RunningContainer {
2809 name: "b".into(),
2810 container_id: "id-b".into(),
2811 essential: false,
2812 exit_code: Some(1),
2813 network_bindings: Vec::new(),
2814 image_digest: None,
2815 },
2816 ];
2817 assert!(task_should_stop(&containers));
2818 }
2819
2820 #[test]
2821 fn finalize_stopped_multi_assigns_per_container_exit_codes() {
2822 let mut accounts: MultiAccountState<EcsState> =
2823 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
2824 let acct = accounts.get_or_create("000000000000");
2825 let mut t = make_task("t1");
2826 t.containers = vec![
2827 make_container("app", true),
2828 make_container("sidecar", false),
2829 ];
2830 acct.tasks.insert("t1".into(), t);
2831 let state: SharedEcsState = Arc::new(RwLock::new(accounts));
2832
2833 let final_containers = vec![
2834 RunningContainer {
2835 name: "app".into(),
2836 container_id: "id-app".into(),
2837 essential: true,
2838 exit_code: Some(0),
2839 network_bindings: Vec::new(),
2840 image_digest: None,
2841 },
2842 RunningContainer {
2843 name: "sidecar".into(),
2844 container_id: "id-sc".into(),
2845 essential: false,
2846 exit_code: Some(137),
2847 network_bindings: Vec::new(),
2848 image_digest: None,
2849 },
2850 ];
2851 finalize_stopped_multi(
2852 &state,
2853 "000000000000",
2854 "t1",
2855 &final_containers,
2856 0,
2857 "captured",
2858 "EssentialContainerExited",
2859 None,
2860 );
2861
2862 let accounts = state.read();
2863 let task = accounts
2864 .get("000000000000")
2865 .unwrap()
2866 .tasks
2867 .get("t1")
2868 .unwrap();
2869 assert_eq!(task.last_status, "STOPPED");
2870 assert_eq!(task.stop_code.as_deref(), Some("EssentialContainerExited"));
2871 let app = task.containers.iter().find(|c| c.name == "app").unwrap();
2872 let sc = task
2873 .containers
2874 .iter()
2875 .find(|c| c.name == "sidecar")
2876 .unwrap();
2877 assert_eq!(app.exit_code, Some(0));
2878 assert_eq!(sc.exit_code, Some(137));
2879 assert_eq!(app.last_status, "STOPPED");
2880 assert_eq!(sc.last_status, "STOPPED");
2881 }
2882
2883 fn plan(name: &str, deps: &[&str]) -> ContainerPlan {
2884 ContainerPlan {
2885 container_name: name.into(),
2886 image: "alpine".into(),
2887 env: Vec::new(),
2888 entry_point: Vec::new(),
2889 command: Vec::new(),
2890 secrets_refs: Vec::new(),
2891 essential: true,
2892 has_task_role: false,
2893 port_mappings: Vec::new(),
2894 network_mode: None,
2895 depends_on: deps
2896 .iter()
2897 .map(|s| DependsOn {
2898 container_name: (*s).to_string(),
2899 condition: DependsOnCondition::Start,
2900 })
2901 .collect(),
2902 health_check: None,
2903 volume_mounts: Vec::new(),
2904 ulimits: Vec::new(),
2905 linux_parameters: None,
2906 stop_timeout: None,
2907 user: None,
2908 working_directory: None,
2909 tty: false,
2910 interactive: false,
2911 readonly_rootfs: false,
2912 }
2913 }
2914
2915 #[test]
2916 fn topo_sort_orders_by_depends_on() {
2917 let plans = vec![plan("sidecar", &["app"]), plan("app", &[])];
2920 let ordered = topo_sort_plans(plans);
2921 assert_eq!(ordered[0].container_name, "app");
2922 assert_eq!(ordered[1].container_name, "sidecar");
2923 }
2924
2925 #[test]
2926 fn topo_sort_preserves_declaration_order_when_no_deps() {
2927 let plans = vec![plan("first", &[]), plan("second", &[]), plan("third", &[])];
2928 let ordered = topo_sort_plans(plans);
2929 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2930 assert_eq!(names, vec!["first", "second", "third"]);
2931 }
2932
2933 #[test]
2934 fn topo_sort_handles_chain() {
2935 let plans = vec![plan("c", &["b"]), plan("b", &["a"]), plan("a", &[])];
2938 let ordered = topo_sort_plans(plans);
2939 let names: Vec<&str> = ordered.iter().map(|p| p.container_name.as_str()).collect();
2940 assert_eq!(names, vec!["a", "b", "c"]);
2941 }
2942
2943 #[test]
2944 fn topo_sort_ignores_unknown_dependency() {
2945 let plans = vec![plan("only", &["does-not-exist"])];
2949 let ordered = topo_sort_plans(plans);
2950 assert_eq!(ordered.len(), 1);
2951 assert_eq!(ordered[0].container_name, "only");
2952 }
2953
2954 #[test]
2955 fn topo_sort_recovers_from_cycle() {
2956 let plans = vec![plan("a", &["b"]), plan("b", &["a"])];
2959 let ordered = topo_sort_plans(plans);
2960 assert_eq!(ordered.len(), 2);
2961 }
2962
2963 #[test]
2964 fn parse_health_check_fills_aws_defaults() {
2965 let v = serde_json::json!({
2966 "command": ["CMD-SHELL", "curl -f http://localhost/ || exit 1"],
2967 });
2968 let hc = __test_parse_health_check(&v).expect("parsed");
2969 assert_eq!(hc.command[0], "CMD-SHELL");
2970 assert_eq!(hc.interval_seconds, 30);
2971 assert_eq!(hc.timeout_seconds, 5);
2972 assert_eq!(hc.retries, 3);
2973 assert_eq!(hc.start_period_seconds, 0);
2974 }
2975
2976 #[test]
2977 fn parse_health_check_overrides_explicit_values() {
2978 let v = serde_json::json!({
2979 "command": ["CMD", "/probe"],
2980 "interval": 7,
2981 "timeout": 2,
2982 "retries": 9,
2983 "startPeriod": 12,
2984 });
2985 let hc = __test_parse_health_check(&v).expect("parsed");
2986 assert_eq!(hc.interval_seconds, 7);
2987 assert_eq!(hc.timeout_seconds, 2);
2988 assert_eq!(hc.retries, 9);
2989 assert_eq!(hc.start_period_seconds, 12);
2990 }
2991
2992 #[test]
2993 fn parse_health_check_returns_none_for_none_sentinel() {
2994 let v = serde_json::json!({ "command": ["NONE"] });
2997 assert!(__test_parse_health_check(&v).is_none());
2998 }
2999
3000 #[test]
3001 fn parse_health_check_returns_none_for_missing_command() {
3002 let v = serde_json::json!({ "interval": 30 });
3003 assert!(__test_parse_health_check(&v).is_none());
3004 }
3005
3006 #[test]
3007 fn render_health_flags_emits_full_set_for_cmd_shell() {
3008 let hc = HealthCheckSpec {
3009 command: vec!["CMD-SHELL".into(), "curl -f http://localhost/".into()],
3010 interval_seconds: 15,
3011 timeout_seconds: 3,
3012 retries: 4,
3013 start_period_seconds: 10,
3014 };
3015 let flags = render_health_flags(&hc);
3016 assert_eq!(flags[0], "--health-cmd");
3017 assert_eq!(flags[1], "curl -f http://localhost/");
3018 assert!(flags.contains(&"--health-interval=15s".to_string()));
3019 assert!(flags.contains(&"--health-timeout=3s".to_string()));
3020 assert!(flags.contains(&"--health-retries=4".to_string()));
3021 assert!(flags.contains(&"--health-start-period=10s".to_string()));
3022 }
3023
3024 #[test]
3025 fn render_health_flags_joins_cmd_argv_with_spaces() {
3026 let hc = HealthCheckSpec {
3029 command: vec![
3030 "CMD".into(),
3031 "/bin/probe".into(),
3032 "--port".into(),
3033 "8080".into(),
3034 ],
3035 interval_seconds: 30,
3036 timeout_seconds: 5,
3037 retries: 3,
3038 start_period_seconds: 0,
3039 };
3040 let flags = render_health_flags(&hc);
3041 assert_eq!(flags[1], "/bin/probe --port 8080");
3042 }
3043
3044 #[test]
3045 fn build_run_argv_emits_health_flags_when_present() {
3046 let plan = ContainerPlan {
3047 container_name: "app".into(),
3048 image: "alpine".into(),
3049 env: Vec::new(),
3050 entry_point: Vec::new(),
3051 command: Vec::new(),
3052 secrets_refs: Vec::new(),
3053 essential: true,
3054 has_task_role: false,
3055 port_mappings: Vec::new(),
3056 network_mode: None,
3057 depends_on: Vec::new(),
3058 health_check: Some(HealthCheckSpec {
3059 command: vec!["CMD-SHELL".into(), "true".into()],
3060 interval_seconds: 5,
3061 timeout_seconds: 2,
3062 retries: 1,
3063 start_period_seconds: 1,
3064 }),
3065 volume_mounts: Vec::new(),
3066 ulimits: Vec::new(),
3067 linux_parameters: None,
3068 stop_timeout: None,
3069 user: None,
3070 working_directory: None,
3071 tty: false,
3072 interactive: false,
3073 readonly_rootfs: false,
3074 };
3075 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3076 let joined = argv.join(" ");
3077 assert!(joined.contains("--health-cmd true"), "argv: {joined}");
3078 assert!(joined.contains("--health-interval=5s"), "argv: {joined}");
3079 assert!(joined.contains("--health-timeout=2s"), "argv: {joined}");
3080 assert!(joined.contains("--health-retries=1"), "argv: {joined}");
3081 assert!(
3082 joined.contains("--health-start-period=1s"),
3083 "argv: {joined}"
3084 );
3085 }
3086
3087 #[test]
3088 fn build_run_argv_emits_no_health_flags_when_absent() {
3089 let plan = ContainerPlan {
3090 container_name: "app".into(),
3091 image: "alpine".into(),
3092 env: Vec::new(),
3093 entry_point: Vec::new(),
3094 command: Vec::new(),
3095 secrets_refs: Vec::new(),
3096 essential: true,
3097 has_task_role: false,
3098 port_mappings: Vec::new(),
3099 network_mode: None,
3100 depends_on: Vec::new(),
3101 health_check: None,
3102 volume_mounts: Vec::new(),
3103 ulimits: Vec::new(),
3104 linux_parameters: None,
3105 stop_timeout: None,
3106 user: None,
3107 working_directory: None,
3108 tty: false,
3109 interactive: false,
3110 readonly_rootfs: false,
3111 };
3112 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3113 assert!(!argv.iter().any(|s| s.starts_with("--health")));
3114 }
3115
3116 #[test]
3117 fn docker_health_to_ecs_maps_known_states() {
3118 assert_eq!(docker_health_to_ecs("healthy"), "HEALTHY");
3119 assert_eq!(docker_health_to_ecs("HEALTHY"), "HEALTHY");
3120 assert_eq!(docker_health_to_ecs("unhealthy"), "UNHEALTHY");
3121 assert_eq!(docker_health_to_ecs("starting"), "UNKNOWN");
3122 assert_eq!(docker_health_to_ecs("none"), "UNKNOWN");
3123 assert_eq!(docker_health_to_ecs(""), "UNKNOWN");
3124 }
3125
3126 #[test]
3129 fn resolve_host_bind_volume_uses_source_path() {
3130 let mut volumes = std::collections::HashMap::new();
3131 let v = serde_json::json!({
3132 "name": "data",
3133 "host": { "sourcePath": "/var/lib/myapp" }
3134 });
3135 volumes.insert("data".to_string(), &v);
3136 let mp = serde_json::json!({
3137 "sourceVolume": "data",
3138 "containerPath": "/app/data",
3139 "readOnly": false
3140 });
3141 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3142 assert_eq!(resolved.source, "/var/lib/myapp");
3143 assert_eq!(resolved.container_path, "/app/data");
3144 assert!(!resolved.read_only);
3145 }
3146
3147 #[test]
3150 fn read_only_mount_renders_ro_suffix() {
3151 let plan = ContainerPlan {
3152 container_name: "app".into(),
3153 image: "alpine".into(),
3154 env: Vec::new(),
3155 entry_point: Vec::new(),
3156 command: Vec::new(),
3157 secrets_refs: Vec::new(),
3158 essential: true,
3159 has_task_role: false,
3160 port_mappings: Vec::new(),
3161 network_mode: None,
3162 depends_on: Vec::new(),
3163 health_check: None,
3164 volume_mounts: vec![VolumeMount {
3165 source: "/host/path".into(),
3166 container_path: "/in/container".into(),
3167 read_only: true,
3168 }],
3169 ulimits: Vec::new(),
3170 linux_parameters: None,
3171 stop_timeout: None,
3172 user: None,
3173 working_directory: None,
3174 tty: false,
3175 interactive: false,
3176 readonly_rootfs: false,
3177 };
3178 let argv = build_run_argv(&plan, &[], "task-1", "host-gateway", "alpine");
3179 let pair = argv
3180 .windows(2)
3181 .find(|w| w[0] == "-v")
3182 .expect("expected -v flag");
3183 assert_eq!(pair[1], "/host/path:/in/container:ro");
3184 }
3185
3186 #[test]
3191 fn resolve_efs_volume_uses_stub_dir() {
3192 let mut volumes = std::collections::HashMap::new();
3193 let v = serde_json::json!({
3194 "name": "efs-vol",
3195 "efsVolumeConfiguration": {
3196 "fileSystemId": "fs-12345678",
3197 "rootDirectory": "/exports/app"
3198 }
3199 });
3200 volumes.insert("efs-vol".to_string(), &v);
3201 let mp = serde_json::json!({
3202 "sourceVolume": "efs-vol",
3203 "containerPath": "/mnt/efs"
3204 });
3205 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3206 assert_eq!(
3207 resolved.source,
3208 "/tmp/fakecloud/efs/fs-12345678/exports/app"
3209 );
3210 assert_eq!(resolved.container_path, "/mnt/efs");
3211 }
3212
3213 #[test]
3217 fn efs_without_root_directory_uses_filesystem_root() {
3218 assert_eq!(
3219 stub_dir_for("efs", "fs-abc", "/"),
3220 "/tmp/fakecloud/efs/fs-abc"
3221 );
3222 assert_eq!(
3223 stub_dir_for("efs", "fs-abc", ""),
3224 "/tmp/fakecloud/efs/fs-abc"
3225 );
3226 }
3227
3228 #[test]
3232 fn resolve_docker_named_volume_uses_volume_name() {
3233 let mut volumes = std::collections::HashMap::new();
3234 let v = serde_json::json!({
3235 "name": "named-vol",
3236 "dockerVolumeConfiguration": {
3237 "scope": "task",
3238 "driver": "local"
3239 }
3240 });
3241 volumes.insert("named-vol".to_string(), &v);
3242 let mp = serde_json::json!({
3243 "sourceVolume": "named-vol",
3244 "containerPath": "/data"
3245 });
3246 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3247 assert_eq!(resolved.source, "named-vol");
3248 assert_eq!(resolved.container_path, "/data");
3249 }
3250
3251 #[test]
3254 fn resolve_fsx_volume_uses_stub_dir() {
3255 let mut volumes = std::collections::HashMap::new();
3256 let v = serde_json::json!({
3257 "name": "fsx-vol",
3258 "fsxWindowsFileServerVolumeConfiguration": {
3259 "fileSystemId": "fs-xyz",
3260 "rootDirectory": "share"
3261 }
3262 });
3263 volumes.insert("fsx-vol".to_string(), &v);
3264 let mp = serde_json::json!({
3265 "sourceVolume": "fsx-vol",
3266 "containerPath": "C:\\data"
3267 });
3268 let resolved = resolve_mount_point(&mp, &volumes).expect("resolved");
3269 assert_eq!(resolved.source, "/tmp/fakecloud/fsx/fs-xyz/share");
3270 }
3271
3272 #[test]
3276 fn unknown_source_volume_returns_none() {
3277 let volumes = std::collections::HashMap::new();
3278 let mp = serde_json::json!({
3279 "sourceVolume": "missing",
3280 "containerPath": "/x"
3281 });
3282 assert!(resolve_mount_point(&mp, &volumes).is_none());
3283 }
3284
3285 #[test]
3289 fn find_depends_on_cycle_detects_two_node_cycle() {
3290 let cds = vec![
3291 serde_json::json!({
3292 "name": "a",
3293 "image": "alpine",
3294 "dependsOn": [{"containerName": "b", "condition": "START"}],
3295 }),
3296 serde_json::json!({
3297 "name": "b",
3298 "image": "alpine",
3299 "dependsOn": [{"containerName": "a", "condition": "START"}],
3300 }),
3301 ];
3302 let cycle = find_depends_on_cycle(&cds);
3303 assert!(cycle.is_some(), "expected cycle to be detected");
3304 }
3305
3306 #[test]
3310 fn find_depends_on_cycle_accepts_chain() {
3311 let cds = vec![
3312 serde_json::json!({
3313 "name": "a",
3314 "image": "alpine",
3315 "dependsOn": [{"containerName": "b", "condition": "START"}],
3316 }),
3317 serde_json::json!({
3318 "name": "b",
3319 "image": "alpine",
3320 "dependsOn": [{"containerName": "c", "condition": "START"}],
3321 }),
3322 serde_json::json!({
3323 "name": "c",
3324 "image": "alpine",
3325 }),
3326 ];
3327 assert!(find_depends_on_cycle(&cds).is_none());
3328 }
3329
3330 #[test]
3334 fn find_depends_on_cycle_ignores_unknown_target() {
3335 let cds = vec![serde_json::json!({
3336 "name": "only",
3337 "image": "alpine",
3338 "dependsOn": [{"containerName": "ghost", "condition": "START"}],
3339 })];
3340 assert!(find_depends_on_cycle(&cds).is_none());
3341 }
3342
3343 #[test]
3347 fn condition_is_met_matches_aws_semantics() {
3348 let running = InspectedState {
3349 started: true,
3350 exited: false,
3351 exit_code: 0,
3352 health: None,
3353 };
3354 let exited_ok = InspectedState {
3355 started: true,
3356 exited: true,
3357 exit_code: 0,
3358 health: None,
3359 };
3360 let exited_fail = InspectedState {
3361 started: true,
3362 exited: true,
3363 exit_code: 1,
3364 health: None,
3365 };
3366 let healthy = InspectedState {
3367 started: true,
3368 exited: false,
3369 exit_code: 0,
3370 health: Some("healthy".into()),
3371 };
3372
3373 assert!(condition_is_met(DependsOnCondition::Start, &running));
3376 assert!(condition_is_met(DependsOnCondition::Start, &exited_ok));
3377
3378 assert!(!condition_is_met(DependsOnCondition::Complete, &running));
3380 assert!(condition_is_met(DependsOnCondition::Complete, &exited_ok));
3381 assert!(condition_is_met(DependsOnCondition::Complete, &exited_fail));
3382
3383 assert!(!condition_is_met(DependsOnCondition::Success, &running));
3385 assert!(condition_is_met(DependsOnCondition::Success, &exited_ok));
3386 assert!(!condition_is_met(DependsOnCondition::Success, &exited_fail));
3387
3388 assert!(!condition_is_met(DependsOnCondition::Healthy, &running));
3390 assert!(condition_is_met(DependsOnCondition::Healthy, &healthy));
3391 }
3392
3393 #[test]
3397 fn depends_on_condition_parse_round_trips() {
3398 assert_eq!(
3399 DependsOnCondition::parse("START"),
3400 Some(DependsOnCondition::Start)
3401 );
3402 assert_eq!(
3403 DependsOnCondition::parse("COMPLETE"),
3404 Some(DependsOnCondition::Complete)
3405 );
3406 assert_eq!(
3407 DependsOnCondition::parse("SUCCESS"),
3408 Some(DependsOnCondition::Success)
3409 );
3410 assert_eq!(
3411 DependsOnCondition::parse("HEALTHY"),
3412 Some(DependsOnCondition::Healthy)
3413 );
3414 assert_eq!(DependsOnCondition::parse("start"), None);
3415 assert_eq!(DependsOnCondition::parse("ANY"), None);
3416 }
3417
3418 #[test]
3421 fn build_run_argv_emits_ulimits() {
3422 let plan = ContainerPlan {
3423 container_name: "app".into(),
3424 image: "alpine".into(),
3425 env: Vec::new(),
3426 entry_point: Vec::new(),
3427 command: Vec::new(),
3428 secrets_refs: Vec::new(),
3429 essential: true,
3430 has_task_role: false,
3431 port_mappings: Vec::new(),
3432 network_mode: None,
3433 depends_on: Vec::new(),
3434 health_check: None,
3435 volume_mounts: Vec::new(),
3436 ulimits: vec![Ulimit {
3437 name: "nofile".into(),
3438 soft_limit: 1024,
3439 hard_limit: 2048,
3440 }],
3441 linux_parameters: None,
3442 stop_timeout: None,
3443 user: None,
3444 working_directory: None,
3445 tty: false,
3446 interactive: false,
3447 readonly_rootfs: false,
3448 };
3449 let argv = build_run_argv(&plan, &[], "t", "host", "img");
3450 assert!(argv.contains(&"--ulimit".to_string()));
3451 assert!(argv.contains(&"nofile=1024:2048".to_string()));
3452 }
3453
3454 #[test]
3455 fn build_run_argv_emits_linux_parameters() {
3456 let plan = ContainerPlan {
3457 container_name: "app".into(),
3458 image: "alpine".into(),
3459 env: Vec::new(),
3460 entry_point: Vec::new(),
3461 command: Vec::new(),
3462 secrets_refs: Vec::new(),
3463 essential: true,
3464 has_task_role: false,
3465 port_mappings: Vec::new(),
3466 network_mode: None,
3467 depends_on: Vec::new(),
3468 health_check: None,
3469 volume_mounts: Vec::new(),
3470 ulimits: Vec::new(),
3471 linux_parameters: Some(LinuxParameters {
3472 capabilities_add: vec!["NET_ADMIN".into()],
3473 capabilities_drop: vec!["ALL".into()],
3474 devices: vec![Device {
3475 host_path: "/dev/zero".into(),
3476 container_path: "/dev/zero".into(),
3477 permissions: "rwm".into(),
3478 }],
3479 init_process_enabled: true,
3480 shared_memory_size: Some(256),
3481 sysctls: vec![Sysctl {
3482 name: "net.ipv4.ip_forward".into(),
3483 value: "1".into(),
3484 }],
3485 tmpfs: vec![Tmpfs {
3486 container_path: "/tmp".into(),
3487 size: 128,
3488 mount_options: vec!["noexec".into()],
3489 }],
3490 privileged: true,
3491 }),
3492 stop_timeout: Some(30),
3493 user: Some("1000:1000".into()),
3494 working_directory: Some("/app".into()),
3495 tty: true,
3496 interactive: true,
3497 readonly_rootfs: true,
3498 };
3499 let argv = build_run_argv(&plan, &[], "t", "host", "img");
3500 assert!(argv.contains(&"--cap-add".to_string()));
3501 assert!(argv.contains(&"NET_ADMIN".to_string()));
3502 assert!(argv.contains(&"--cap-drop".to_string()));
3503 assert!(argv.contains(&"ALL".to_string()));
3504 assert!(argv.contains(&"--device".to_string()));
3505 assert!(argv.contains(&"/dev/zero:/dev/zerorwm".to_string()));
3506 assert!(argv.contains(&"--init".to_string()));
3507 assert!(argv.contains(&"--shm-size".to_string()));
3508 assert!(argv.contains(&"256m".to_string()));
3509 assert!(argv.contains(&"--sysctl".to_string()));
3510 assert!(argv.contains(&"net.ipv4.ip_forward=1".to_string()));
3511 assert!(argv.contains(&"--tmpfs".to_string()));
3512 assert!(argv.contains(&"--privileged".to_string()));
3513 assert!(argv.contains(&"--stop-timeout".to_string()));
3514 assert!(argv.contains(&"30".to_string()));
3515 assert!(argv.contains(&"--user".to_string()));
3516 assert!(argv.contains(&"1000:1000".to_string()));
3517 assert!(argv.contains(&"--workdir".to_string()));
3518 assert!(argv.contains(&"/app".to_string()));
3519 assert!(argv.contains(&"--tty".to_string()));
3520 assert!(argv.contains(&"--interactive".to_string()));
3521 assert!(argv.contains(&"--read-only".to_string()));
3522 }
3523
3524 #[test]
3525 fn parse_linux_parameters_fills_defaults() {
3526 let raw = serde_json::json!({"initProcessEnabled": true});
3527 let lp = parse_linux_parameters(&raw).expect("parses");
3528 assert!(lp.init_process_enabled);
3529 assert!(!lp.privileged);
3530 assert!(lp.capabilities_add.is_empty());
3531 }
3532
3533 #[test]
3534 fn parse_device_uses_default_permissions() {
3535 let raw = serde_json::json!({"hostPath": "/dev/null", "containerPath": "/dev/null"});
3536 let dev = parse_device(&raw).expect("parses");
3537 assert_eq!(dev.permissions, "rwm");
3538 }
3539
3540 #[test]
3541 fn compute_elbv2_targets_empty_when_no_group() {
3542 let mut accounts: MultiAccountState<EcsState> =
3543 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3544 let acct = accounts.get_or_create("000000000000");
3545 let mut task = make_task("t1");
3546 task.group = None;
3547 acct.tasks.insert("t1".into(), task);
3548 let state = acct.clone();
3549 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3550 assert!(targets.is_empty());
3551 }
3552
3553 #[test]
3554 fn compute_elbv2_targets_bridge_mode_uses_localhost_and_host_port() {
3555 let mut accounts: MultiAccountState<EcsState> =
3556 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3557 let acct = accounts.get_or_create("000000000000");
3558
3559 let td = crate::state::TaskDefinition {
3560 family: "app".into(),
3561 revision: 1,
3562 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3563 container_definitions: Vec::new(),
3564 network_mode: Some("bridge".into()),
3565 status: "ACTIVE".into(),
3566 task_role_arn: None,
3567 execution_role_arn: None,
3568 requires_compatibilities: Vec::new(),
3569 compatibilities: Vec::new(),
3570 cpu: None,
3571 memory: None,
3572 pid_mode: None,
3573 ipc_mode: None,
3574 volumes: Vec::new(),
3575 placement_constraints: Vec::new(),
3576 proxy_configuration: None,
3577 inference_accelerators: Vec::new(),
3578 ephemeral_storage: None,
3579 runtime_platform: None,
3580 requires_attributes: Vec::new(),
3581 registered_at: Utc::now(),
3582 registered_by: None,
3583 deregistered_at: None,
3584 tags: Vec::new(),
3585 enable_fault_injection: None,
3586 };
3587 acct.task_definitions.insert("app".into(), {
3588 let mut m = std::collections::BTreeMap::new();
3589 m.insert(1, td);
3590 m
3591 });
3592
3593 let service = crate::state::Service {
3594 service_name: "svc".into(),
3595 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3596 cluster_name: "default".into(),
3597 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3598 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3599 family: "app".into(),
3600 revision: 1,
3601 desired_count: 1,
3602 running_count: 0,
3603 pending_count: 0,
3604 launch_type: "FARGATE".into(),
3605 status: "ACTIVE".into(),
3606 scheduling_strategy: "REPLICA".into(),
3607 deployment_controller: "ECS".into(),
3608 minimum_healthy_percent: Some(0),
3609 maximum_percent: Some(200),
3610 circuit_breaker: None,
3611 deployments: Vec::new(),
3612 load_balancers: vec![serde_json::json!({
3613 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3614 "containerName": "app",
3615 "containerPort": 80,
3616 })],
3617 service_registries: Vec::new(),
3618 placement_constraints: Vec::new(),
3619 placement_strategy: Vec::new(),
3620 network_configuration: None,
3621 volume_configurations: vec![],
3622 tags: Vec::new(),
3623 created_at: Utc::now(),
3624 created_by: None,
3625 role_arn: None,
3626 platform_version: None,
3627 health_check_grace_period_seconds: None,
3628 enable_execute_command: false,
3629 enable_ecs_managed_tags: false,
3630 propagate_tags: None,
3631 capacity_provider_strategy: Vec::new(),
3632 availability_zone_rebalancing: None,
3633 };
3634 acct.services.insert(
3635 crate::state::EcsState::service_key("default", "svc"),
3636 service,
3637 );
3638
3639 let mut task = make_task("t1");
3640 task.group = Some("service:svc".into());
3641 task.containers = vec![crate::state::Container {
3642 container_arn: "arn:aws:ecs:us-east-1:000000000000:container/default/abc/app".into(),
3643 name: "app".into(),
3644 image: "alpine".into(),
3645 task_arn: task.task_arn.clone(),
3646 last_status: "RUNNING".into(),
3647 exit_code: None,
3648 reason: None,
3649 runtime_id: Some("dockerid-app".into()),
3650 essential: true,
3651 cpu: None,
3652 memory: None,
3653 memory_reservation: None,
3654 network_bindings: vec![serde_json::json!({
3655 "bindIP": "0.0.0.0",
3656 "containerPort": 80,
3657 "hostPort": 32768,
3658 "protocol": "tcp",
3659 })],
3660 network_interfaces: Vec::new(),
3661 health_status: None,
3662 managed_agents: None,
3663 image_digest: None,
3664 }];
3665 acct.tasks.insert("t1".into(), task);
3666
3667 let state = acct.clone();
3668 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3669 assert_eq!(targets.len(), 1);
3670 let (arn, tg_targets) = &targets[0];
3671 assert_eq!(
3672 arn,
3673 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3674 );
3675 assert_eq!(tg_targets.len(), 1);
3676 assert_eq!(tg_targets[0].0, "127.0.0.1");
3677 assert_eq!(tg_targets[0].1, Some(32768));
3678 }
3679
3680 #[test]
3681 fn compute_elbv2_targets_awsvpc_uses_eni_ip() {
3682 let mut accounts: MultiAccountState<EcsState> =
3683 MultiAccountState::new("000000000000", "us-east-1", "http://localhost:4566");
3684 let acct = accounts.get_or_create("000000000000");
3685
3686 let td = crate::state::TaskDefinition {
3687 family: "app".into(),
3688 revision: 1,
3689 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3690 container_definitions: Vec::new(),
3691 network_mode: Some("awsvpc".into()),
3692 status: "ACTIVE".into(),
3693 task_role_arn: None,
3694 execution_role_arn: None,
3695 requires_compatibilities: Vec::new(),
3696 compatibilities: Vec::new(),
3697 cpu: None,
3698 memory: None,
3699 pid_mode: None,
3700 ipc_mode: None,
3701 volumes: Vec::new(),
3702 placement_constraints: Vec::new(),
3703 proxy_configuration: None,
3704 inference_accelerators: Vec::new(),
3705 ephemeral_storage: None,
3706 runtime_platform: None,
3707 requires_attributes: Vec::new(),
3708 registered_at: Utc::now(),
3709 registered_by: None,
3710 deregistered_at: None,
3711 tags: Vec::new(),
3712 enable_fault_injection: None,
3713 };
3714 acct.task_definitions.insert("app".into(), {
3715 let mut m = std::collections::BTreeMap::new();
3716 m.insert(1, td);
3717 m
3718 });
3719
3720 let service = crate::state::Service {
3721 service_name: "svc".into(),
3722 service_arn: "arn:aws:ecs:us-east-1:000000000000:service/default/svc".into(),
3723 cluster_name: "default".into(),
3724 cluster_arn: "arn:aws:ecs:us-east-1:000000000000:cluster/default".into(),
3725 task_definition_arn: "arn:aws:ecs:us-east-1:000000000000:task-definition/app:1".into(),
3726 family: "app".into(),
3727 revision: 1,
3728 desired_count: 1,
3729 running_count: 0,
3730 pending_count: 0,
3731 launch_type: "FARGATE".into(),
3732 status: "ACTIVE".into(),
3733 scheduling_strategy: "REPLICA".into(),
3734 deployment_controller: "ECS".into(),
3735 minimum_healthy_percent: Some(0),
3736 maximum_percent: Some(200),
3737 circuit_breaker: None,
3738 deployments: Vec::new(),
3739 load_balancers: vec![serde_json::json!({
3740 "targetGroupArn": "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc",
3741 "containerName": "app",
3742 "containerPort": 80,
3743 })],
3744 service_registries: Vec::new(),
3745 placement_constraints: Vec::new(),
3746 placement_strategy: Vec::new(),
3747 network_configuration: None,
3748 volume_configurations: vec![],
3749 tags: Vec::new(),
3750 created_at: Utc::now(),
3751 created_by: None,
3752 role_arn: None,
3753 platform_version: None,
3754 health_check_grace_period_seconds: None,
3755 enable_execute_command: false,
3756 enable_ecs_managed_tags: false,
3757 propagate_tags: None,
3758 capacity_provider_strategy: Vec::new(),
3759 availability_zone_rebalancing: None,
3760 };
3761 acct.services.insert(
3762 crate::state::EcsState::service_key("default", "svc"),
3763 service,
3764 );
3765
3766 let mut task = make_task("t1");
3767 task.group = Some("service:svc".into());
3768 task.attachments = vec![crate::state::TaskAttachment {
3769 id: "eni-123".into(),
3770 attachment_type: "eni".into(),
3771 status: "ATTACHED".into(),
3772 details: vec![
3773 crate::state::AttachmentDetail {
3774 name: "privateIPv4Address".into(),
3775 value: "172.18.0.2".into(),
3776 },
3777 crate::state::AttachmentDetail {
3778 name: "macAddress".into(),
3779 value: "02:42:ac:12:00:02".into(),
3780 },
3781 ],
3782 }];
3783 acct.tasks.insert("t1".into(), task);
3784
3785 let state = acct.clone();
3786 let targets = compute_elbv2_targets(&state, state.tasks.get("t1").unwrap());
3787 assert_eq!(targets.len(), 1);
3788 let (arn, tg_targets) = &targets[0];
3789 assert_eq!(
3790 arn,
3791 "arn:aws:elasticloadbalancing:us-east-1:000000000000:targetgroup/tg/abc"
3792 );
3793 assert_eq!(tg_targets.len(), 1);
3794 assert_eq!(tg_targets[0].0, "172.18.0.2");
3795 assert_eq!(tg_targets[0].1, Some(80));
3796 }
3797}