1use std::sync::Arc;
4
5use async_trait::async_trait;
6use http::{Method, StatusCode};
7use serde_json::{json, Map, Value};
8use tokio::sync::Mutex as AsyncMutex;
9use uuid::Uuid;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::{SnapshotHook, SnapshotStore};
14
15use crate::state::{BatchSnapshot, SharedBatchState, BATCH_SNAPSHOT_SCHEMA_VERSION};
16
17const SUPPORTED_ACTIONS: &[&str] = &[
18 "CancelJob",
19 "CreateComputeEnvironment",
20 "CreateConsumableResource",
21 "CreateJobQueue",
22 "CreateQuotaShare",
23 "CreateSchedulingPolicy",
24 "CreateServiceEnvironment",
25 "DeleteComputeEnvironment",
26 "DeleteConsumableResource",
27 "DeleteJobQueue",
28 "DeleteQuotaShare",
29 "DeleteSchedulingPolicy",
30 "DeleteServiceEnvironment",
31 "DeregisterJobDefinition",
32 "DescribeComputeEnvironments",
33 "DescribeConsumableResource",
34 "DescribeJobDefinitions",
35 "DescribeJobQueues",
36 "DescribeJobs",
37 "DescribeQuotaShare",
38 "DescribeSchedulingPolicies",
39 "DescribeServiceEnvironments",
40 "DescribeServiceJob",
41 "GetJobQueueSnapshot",
42 "ListConsumableResources",
43 "ListJobs",
44 "ListJobsByConsumableResource",
45 "ListQuotaShares",
46 "ListSchedulingPolicies",
47 "ListServiceJobs",
48 "ListTagsForResource",
49 "RegisterJobDefinition",
50 "SubmitJob",
51 "SubmitServiceJob",
52 "TagResource",
53 "TerminateJob",
54 "TerminateServiceJob",
55 "UntagResource",
56 "UpdateComputeEnvironment",
57 "UpdateConsumableResource",
58 "UpdateJobQueue",
59 "UpdateQuotaShare",
60 "UpdateSchedulingPolicy",
61 "UpdateServiceEnvironment",
62 "UpdateServiceJob",
63];
64
65const MUTATING_ACTIONS: &[&str] = &[
67 "CreateComputeEnvironment",
68 "UpdateComputeEnvironment",
69 "DeleteComputeEnvironment",
70 "CreateJobQueue",
71 "UpdateJobQueue",
72 "DeleteJobQueue",
73 "RegisterJobDefinition",
74 "DeregisterJobDefinition",
75 "CreateSchedulingPolicy",
76 "UpdateSchedulingPolicy",
77 "DeleteSchedulingPolicy",
78 "SubmitJob",
79 "CancelJob",
80 "TerminateJob",
81 "TagResource",
82 "UntagResource",
83];
84
85pub struct BatchService {
86 state: SharedBatchState,
87 snapshot_store: Option<Arc<dyn SnapshotStore>>,
88 snapshot_lock: Arc<AsyncMutex<()>>,
89 ecs_state: Option<fakecloud_ecs::SharedEcsState>,
93 ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
94}
95
96impl BatchService {
97 pub fn new(state: SharedBatchState) -> Self {
98 Self {
99 state,
100 snapshot_store: None,
101 snapshot_lock: Arc::new(AsyncMutex::new(())),
102 ecs_state: None,
103 ecs_runtime: None,
104 }
105 }
106
107 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
108 self.snapshot_store = Some(store);
109 self
110 }
111
112 pub fn with_ecs(
115 mut self,
116 state: fakecloud_ecs::SharedEcsState,
117 runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
118 ) -> Self {
119 self.ecs_state = Some(state);
120 self.ecs_runtime = runtime;
121 self
122 }
123
124 async fn save_snapshot(&self) {
125 let Some(store) = self.snapshot_store.clone() else {
126 return;
127 };
128 let _guard = self.snapshot_lock.lock().await;
129 let bytes = {
130 let snap = BatchSnapshot {
131 schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
132 accounts: Some(self.state.read().clone()),
133 };
134 serde_json::to_vec(&snap).unwrap_or_default()
135 };
136 let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
137 }
138
139 pub fn snapshot_hook(&self) -> Option<SnapshotHook> {
141 let store = self.snapshot_store.clone()?;
142 let state = self.state.clone();
143 let lock = self.snapshot_lock.clone();
144 Some(Arc::new(move || {
145 let store = store.clone();
146 let state = state.clone();
147 let lock = lock.clone();
148 Box::pin(async move {
149 let _guard = lock.lock().await;
150 let bytes = {
151 let snap = BatchSnapshot {
152 schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
153 accounts: Some(state.read().clone()),
154 };
155 serde_json::to_vec(&snap).unwrap_or_default()
156 };
157 let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
158 })
159 }))
160 }
161
162 pub async fn reconcile_persisted_jobs(&self) {
169 const NON_TERMINAL: &[&str] = &["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"];
170 let now = chrono::Utc::now().timestamp_millis();
171 let mut changed = false;
172 {
173 let mut accounts = self.state.write();
174 for acct in accounts.accounts.values_mut() {
175 for job in acct.jobs.values_mut() {
176 let Some(o) = job.as_object_mut() else {
177 continue;
178 };
179 let st = o.get("status").and_then(Value::as_str).unwrap_or("");
180 if NON_TERMINAL.contains(&st) {
181 o.insert("status".into(), json!("FAILED"));
182 o.insert(
183 "statusReason".into(),
184 json!("Job interrupted by a fakecloud restart"),
185 );
186 o.entry("stoppedAt".to_string())
187 .or_insert_with(|| json!(now));
188 changed = true;
189 }
190 }
191 }
192 }
193 if changed {
194 self.save_snapshot().await;
195 }
196 }
197
198 fn resolve_action(req: &AwsRequest) -> Option<&'static str> {
201 let segs = &req.path_segments;
202 if segs.first().map(|s| s.as_str()) != Some("v1") {
203 return None;
204 }
205 if segs.get(1).map(|s| s.as_str()) == Some("tags") {
207 return match req.method {
208 Method::GET => Some("ListTagsForResource"),
209 Method::POST => Some("TagResource"),
210 Method::DELETE => Some("UntagResource"),
211 _ => None,
212 };
213 }
214 let op = segs.get(1)?.as_str();
215 Some(match op {
216 "canceljob" => "CancelJob",
217 "createcomputeenvironment" => "CreateComputeEnvironment",
218 "createconsumableresource" => "CreateConsumableResource",
219 "createjobqueue" => "CreateJobQueue",
220 "createquotashare" => "CreateQuotaShare",
221 "createschedulingpolicy" => "CreateSchedulingPolicy",
222 "createserviceenvironment" => "CreateServiceEnvironment",
223 "deletecomputeenvironment" => "DeleteComputeEnvironment",
224 "deleteconsumableresource" => "DeleteConsumableResource",
225 "deletejobqueue" => "DeleteJobQueue",
226 "deletequotashare" => "DeleteQuotaShare",
227 "deleteschedulingpolicy" => "DeleteSchedulingPolicy",
228 "deleteserviceenvironment" => "DeleteServiceEnvironment",
229 "deregisterjobdefinition" => "DeregisterJobDefinition",
230 "describecomputeenvironments" => "DescribeComputeEnvironments",
231 "describeconsumableresource" => "DescribeConsumableResource",
232 "describejobdefinitions" => "DescribeJobDefinitions",
233 "describejobqueues" => "DescribeJobQueues",
234 "describejobs" => "DescribeJobs",
235 "describequotashare" => "DescribeQuotaShare",
236 "describeschedulingpolicies" => "DescribeSchedulingPolicies",
237 "describeserviceenvironments" => "DescribeServiceEnvironments",
238 "describeservicejob" => "DescribeServiceJob",
239 "getjobqueuesnapshot" => "GetJobQueueSnapshot",
240 "listconsumableresources" => "ListConsumableResources",
241 "listjobs" => "ListJobs",
242 "listjobsbyconsumableresource" => "ListJobsByConsumableResource",
243 "listquotashares" => "ListQuotaShares",
244 "listschedulingpolicies" => "ListSchedulingPolicies",
245 "listservicejobs" => "ListServiceJobs",
246 "registerjobdefinition" => "RegisterJobDefinition",
247 "submitjob" => "SubmitJob",
248 "submitservicejob" => "SubmitServiceJob",
249 "terminatejob" => "TerminateJob",
250 "terminateservicejob" => "TerminateServiceJob",
251 "updatecomputeenvironment" => "UpdateComputeEnvironment",
252 "updateconsumableresource" => "UpdateConsumableResource",
253 "updatejobqueue" => "UpdateJobQueue",
254 "updatequotashare" => "UpdateQuotaShare",
255 "updateschedulingpolicy" => "UpdateSchedulingPolicy",
256 "updateserviceenvironment" => "UpdateServiceEnvironment",
257 "updateservicejob" => "UpdateServiceJob",
258 _ => return None,
259 })
260 }
261
262 fn dispatch(&self, action: &str, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
263 match action {
264 "CreateComputeEnvironment" => self.create_compute_environment(req),
265 "DescribeComputeEnvironments" => self.describe_compute_environments(req),
266 "DeleteComputeEnvironment" => self.delete_compute_environment(req),
267 "CreateJobQueue" => self.create_job_queue(req),
268 "DescribeJobQueues" => self.describe_job_queues(req),
269 "DeleteJobQueue" => self.delete_job_queue(req),
270 "UpdateComputeEnvironment" => self.update_compute_environment(req),
271 "UpdateJobQueue" => self.update_job_queue(req),
272 "RegisterJobDefinition" => self.register_job_definition(req),
273 "DescribeJobDefinitions" => self.describe_job_definitions(req),
274 "DeregisterJobDefinition" => self.deregister_job_definition(req),
275 "CreateSchedulingPolicy" => self.create_scheduling_policy(req),
276 "DescribeSchedulingPolicies" => self.describe_scheduling_policies(req),
277 "ListSchedulingPolicies" => self.list_scheduling_policies(req),
278 "UpdateSchedulingPolicy" => self.update_scheduling_policy(req),
279 "DeleteSchedulingPolicy" => self.delete_scheduling_policy(req),
280 "DescribeJobs" => self.describe_jobs(req),
281 "ListJobs" => self.list_jobs(req),
282 "CancelJob" => self.cancel_job(req),
283 "TerminateJob" => self.terminate_job(req),
284 "TagResource" => self.tag_resource(req),
285 "UntagResource" => self.untag_resource(req),
286 "ListTagsForResource" => self.list_tags_for_resource(req),
287 other => Err(AwsServiceError::action_not_implemented("batch", other)),
288 }
289 }
290}
291
292fn obj(v: &Value) -> Map<String, Value> {
293 v.as_object().cloned().unwrap_or_default()
294}
295
296fn client_error(code: &str, msg: impl Into<String>) -> AwsServiceError {
297 AwsServiceError::aws_error(StatusCode::BAD_REQUEST, code, msg.into())
298}
299
300type TagStore = std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>;
301
302fn seed_inline_tags(tags: &mut TagStore, arn: &str, stored: &Map<String, Value>) {
305 if let Some(inline) = stored.get("tags").and_then(Value::as_object) {
306 let entry = tags.entry(arn.to_string()).or_default();
307 for (k, v) in inline {
308 if let Some(s) = v.as_str() {
309 entry.insert(k.clone(), s.to_string());
310 }
311 }
312 }
313}
314
315fn merge_tag_overlay(resource: &Value, arn_key: &str, tags: &TagStore) -> Value {
319 let mut o = obj(resource);
320 if let Some(arn) = o.get(arn_key).and_then(Value::as_str).map(String::from) {
321 if let Some(t) = tags.get(&arn) {
322 o.insert(
323 "tags".into(),
324 Value::Object(t.iter().map(|(k, v)| (k.clone(), json!(v))).collect()),
325 );
326 }
327 }
328 Value::Object(o)
329}
330
331fn job_summary(j: &Value) -> Value {
335 let mut s = serde_json::Map::new();
336 for key in [
337 "jobId",
338 "jobArn",
339 "jobName",
340 "createdAt",
341 "status",
342 "statusReason",
343 "startedAt",
344 "stoppedAt",
345 "jobDefinition",
346 "container",
347 "arrayProperties",
348 "nodeProperties",
349 ] {
350 if let Some(v) = j.get(key) {
351 s.insert(key.to_string(), v.clone());
352 }
353 }
354 Value::Object(s)
355}
356
357impl BatchService {
358 fn arn(&self, account: &str, region: &str, resource: &str) -> String {
359 Arn::new("batch", region, account, resource).to_string()
360 }
361
362 fn create_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
365 let body = req.json_body();
366 let name = body
367 .get("computeEnvironmentName")
368 .and_then(Value::as_str)
369 .ok_or_else(|| client_error("ClientException", "computeEnvironmentName is required"))?
370 .to_string();
371 let arn = self.arn(
372 &req.account_id,
373 &req.region,
374 &format!("compute-environment/{name}"),
375 );
376 let mut stored = obj(&body);
377 stored.insert("computeEnvironmentArn".into(), json!(arn));
378 stored.insert("status".into(), json!("VALID"));
379 stored.insert("statusReason".into(), json!("ComputeEnvironment Healthy"));
380 stored
381 .entry("state".to_string())
382 .or_insert_with(|| json!("ENABLED"));
383 let uuid = Uuid::new_v4().to_string();
384 stored.insert(
387 "ecsClusterArn".into(),
388 json!(format!(
389 "arn:aws:ecs:{}:{}:cluster/AWSBatch-{name}-{uuid}",
390 req.region, req.account_id
391 )),
392 );
393 stored.insert("uuid".into(), json!(uuid));
394
395 let mut accounts = self.state.write();
396 let st = accounts.get_or_create(&req.account_id);
397 if st.compute_environments.contains_key(&name) {
398 return Err(client_error(
399 "ClientException",
400 format!("Object already exists: {name}"),
401 ));
402 }
403 seed_inline_tags(&mut st.tags, &arn, &stored);
404 st.compute_environments
405 .insert(name.clone(), Value::Object(stored));
406 Ok(AwsResponse::ok_json(json!({
407 "computeEnvironmentName": name,
408 "computeEnvironmentArn": arn,
409 })))
410 }
411
412 fn describe_compute_environments(
413 &self,
414 req: &AwsRequest,
415 ) -> Result<AwsResponse, AwsServiceError> {
416 let body = req.json_body();
417 let wanted = string_set(&body, "computeEnvironments");
418 let accounts = self.state.read();
419 let items: Vec<Value> = accounts
420 .get(&req.account_id)
421 .map(|st| {
422 st.compute_environments
423 .values()
424 .filter(|ce| {
425 match_named(
426 ce,
427 &wanted,
428 "computeEnvironmentName",
429 "computeEnvironmentArn",
430 )
431 })
432 .map(|ce| {
433 let mut v = merge_tag_overlay(ce, "computeEnvironmentArn", &st.tags);
434 if let Some(o) = v.as_object_mut() {
436 o.entry("containerOrchestrationType".to_string())
437 .or_insert_with(|| json!("ECS"));
438 }
439 v
440 })
441 .collect()
442 })
443 .unwrap_or_default();
444 Ok(AwsResponse::ok_json(
445 json!({ "computeEnvironments": items }),
446 ))
447 }
448
449 fn delete_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
450 let body = req.json_body();
451 let name = arn_or_name(&body, "computeEnvironment")?;
452 let mut accounts = self.state.write();
453 accounts
454 .get_or_create(&req.account_id)
455 .compute_environments
456 .remove(&name);
457 Ok(AwsResponse::ok_json(json!({})))
458 }
459
460 fn create_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
463 let body = req.json_body();
464 let name = body
465 .get("jobQueueName")
466 .and_then(Value::as_str)
467 .ok_or_else(|| client_error("ClientException", "jobQueueName is required"))?
468 .to_string();
469 let arn = self.arn(&req.account_id, &req.region, &format!("job-queue/{name}"));
470 let mut stored = obj(&body);
471 stored.insert("jobQueueArn".into(), json!(arn));
472 stored.insert("status".into(), json!("VALID"));
473 stored.insert("statusReason".into(), json!("JobQueue Healthy"));
474 stored
475 .entry("state".to_string())
476 .or_insert_with(|| json!("ENABLED"));
477
478 let mut accounts = self.state.write();
479 let st = accounts.get_or_create(&req.account_id);
480 if st.job_queues.contains_key(&name) {
481 return Err(client_error(
482 "ClientException",
483 format!("Object already exists: {name}"),
484 ));
485 }
486 seed_inline_tags(&mut st.tags, &arn, &stored);
487 st.job_queues.insert(name.clone(), Value::Object(stored));
488 Ok(AwsResponse::ok_json(json!({
489 "jobQueueName": name,
490 "jobQueueArn": arn,
491 })))
492 }
493
494 fn describe_job_queues(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495 let body = req.json_body();
496 let wanted = string_set(&body, "jobQueues");
497 let accounts = self.state.read();
498 let items: Vec<Value> = accounts
499 .get(&req.account_id)
500 .map(|st| {
501 st.job_queues
502 .values()
503 .filter(|q| match_named(q, &wanted, "jobQueueName", "jobQueueArn"))
504 .map(|q| merge_tag_overlay(q, "jobQueueArn", &st.tags))
505 .collect()
506 })
507 .unwrap_or_default();
508 Ok(AwsResponse::ok_json(json!({ "jobQueues": items })))
509 }
510
511 fn delete_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
512 let body = req.json_body();
513 let name = arn_or_name(&body, "jobQueue")?;
514 let mut accounts = self.state.write();
515 accounts
516 .get_or_create(&req.account_id)
517 .job_queues
518 .remove(&name);
519 Ok(AwsResponse::ok_json(json!({})))
520 }
521
522 fn register_job_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
525 let body = req.json_body();
526 let name = body
527 .get("jobDefinitionName")
528 .and_then(Value::as_str)
529 .ok_or_else(|| client_error("ClientException", "jobDefinitionName is required"))?
530 .to_string();
531 let mut accounts = self.state.write();
532 let st = accounts.get_or_create(&req.account_id);
533 let revision = st.job_def_revisions.entry(name.clone()).or_insert(0);
534 *revision += 1;
535 let revision = *revision;
536 let arn = self.arn(
537 &req.account_id,
538 &req.region,
539 &format!("job-definition/{name}:{revision}"),
540 );
541 let mut stored = obj(&body);
542 stored.insert("jobDefinitionArn".into(), json!(arn));
543 stored.insert("revision".into(), json!(revision));
544 stored.insert("status".into(), json!("ACTIVE"));
545 if let Some(cp) = stored
549 .get_mut("containerProperties")
550 .and_then(Value::as_object_mut)
551 {
552 for key in [
553 "environment",
554 "mountPoints",
555 "resourceRequirements",
556 "secrets",
557 "ulimits",
558 "volumes",
559 ] {
560 cp.entry(key.to_string()).or_insert_with(|| json!([]));
561 }
562 }
563 seed_inline_tags(&mut st.tags, &arn, &stored);
564 st.job_definitions
565 .insert(format!("{name}:{revision}"), Value::Object(stored));
566 Ok(AwsResponse::ok_json(json!({
567 "jobDefinitionName": name,
568 "jobDefinitionArn": arn,
569 "revision": revision,
570 })))
571 }
572
573 fn describe_job_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574 let body = req.json_body();
575 let wanted = string_set(&body, "jobDefinitions");
576 let name_filter = body
577 .get("jobDefinitionName")
578 .and_then(Value::as_str)
579 .map(|s| s.to_string());
580 let status_filter = body
581 .get("status")
582 .and_then(Value::as_str)
583 .map(|s| s.to_string());
584 let accounts = self.state.read();
585 let items: Vec<Value> = accounts
586 .get(&req.account_id)
587 .map(|st| {
588 st.job_definitions
589 .values()
590 .filter(|jd| {
591 let arn_ok = wanted.is_empty()
593 || jd
594 .get("jobDefinitionArn")
595 .and_then(Value::as_str)
596 .map(|a| wanted.contains(a))
597 .unwrap_or(false)
598 || jd
599 .get("jobDefinitionName")
600 .and_then(Value::as_str)
601 .map(|n| wanted.contains(n))
602 .unwrap_or(false);
603 let name_ok = name_filter.as_deref().is_none_or(|n| {
604 jd.get("jobDefinitionName").and_then(Value::as_str) == Some(n)
605 });
606 let status_ok = status_filter
607 .as_deref()
608 .is_none_or(|s| jd.get("status").and_then(Value::as_str) == Some(s));
609 arn_ok && name_ok && status_ok
610 })
611 .map(|jd| merge_tag_overlay(jd, "jobDefinitionArn", &st.tags))
612 .collect()
613 })
614 .unwrap_or_default();
615 Ok(AwsResponse::ok_json(json!({ "jobDefinitions": items })))
616 }
617
618 fn deregister_job_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
619 let body = req.json_body();
620 let id = body
621 .get("jobDefinition")
622 .and_then(Value::as_str)
623 .ok_or_else(|| client_error("ClientException", "jobDefinition is required"))?;
624 let key = id.rsplit('/').next().unwrap_or(id).to_string();
626 let mut accounts = self.state.write();
627 let st = accounts.get_or_create(&req.account_id);
628 if let Some(jd) = st.job_definitions.get_mut(&key) {
629 if let Some(o) = jd.as_object_mut() {
630 o.insert("status".into(), json!("INACTIVE"));
631 }
632 }
633 Ok(AwsResponse::ok_json(json!({})))
634 }
635
636 fn update_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637 let body = req.json_body();
638 let name = arn_or_name(&body, "computeEnvironment")?;
639 let mut accounts = self.state.write();
640 let st = accounts.get_or_create(&req.account_id);
641 let ce = st
642 .compute_environments
643 .get_mut(&name)
644 .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
645 let arn = merge_updates(
646 ce,
647 &body,
648 &[
649 "state",
650 "desiredvCpus",
651 "computeResources",
652 "serviceRole",
653 "updatePolicy",
654 ],
655 "computeEnvironmentArn",
656 );
657 Ok(AwsResponse::ok_json(json!({
658 "computeEnvironmentName": name,
659 "computeEnvironmentArn": arn,
660 })))
661 }
662
663 fn update_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
664 let body = req.json_body();
665 let name = arn_or_name(&body, "jobQueue")?;
666 let mut accounts = self.state.write();
667 let st = accounts.get_or_create(&req.account_id);
668 let q = st
669 .job_queues
670 .get_mut(&name)
671 .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
672 let arn = merge_updates(
673 q,
674 &body,
675 &[
676 "state",
677 "priority",
678 "computeEnvironmentOrder",
679 "schedulingPolicyArn",
680 "jobStateTimeLimitActions",
681 ],
682 "jobQueueArn",
683 );
684 Ok(AwsResponse::ok_json(json!({
685 "jobQueueName": name,
686 "jobQueueArn": arn,
687 })))
688 }
689
690 fn create_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
693 let body = req.json_body();
694 let name = body
695 .get("name")
696 .and_then(Value::as_str)
697 .ok_or_else(|| client_error("ClientException", "name is required"))?
698 .to_string();
699 let arn = self.arn(
700 &req.account_id,
701 &req.region,
702 &format!("scheduling-policy/{name}"),
703 );
704 let mut stored = obj(&body);
705 stored.insert("arn".into(), json!(arn));
706 let mut accounts = self.state.write();
707 let st = accounts.get_or_create(&req.account_id);
708 if st.scheduling_policies.contains_key(&name) {
709 return Err(client_error(
710 "ClientException",
711 format!("Object already exists: {name}"),
712 ));
713 }
714 seed_inline_tags(&mut st.tags, &arn, &stored);
715 st.scheduling_policies
716 .insert(name.clone(), Value::Object(stored));
717 Ok(AwsResponse::ok_json(json!({ "name": name, "arn": arn })))
718 }
719
720 fn describe_scheduling_policies(
721 &self,
722 req: &AwsRequest,
723 ) -> Result<AwsResponse, AwsServiceError> {
724 let body = req.json_body();
725 let wanted = string_set(&body, "arns");
726 let accounts = self.state.read();
727 let items: Vec<Value> = accounts
728 .get(&req.account_id)
729 .map(|st| {
730 st.scheduling_policies
731 .values()
732 .filter(|p| {
733 wanted.is_empty()
734 || p.get("arn")
735 .and_then(Value::as_str)
736 .map(|a| wanted.contains(a))
737 .unwrap_or(false)
738 })
739 .map(|p| merge_tag_overlay(p, "arn", &st.tags))
740 .collect()
741 })
742 .unwrap_or_default();
743 Ok(AwsResponse::ok_json(json!({ "schedulingPolicies": items })))
744 }
745
746 fn list_scheduling_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747 let accounts = self.state.read();
748 let items: Vec<Value> = accounts
749 .get(&req.account_id)
750 .map(|st| {
751 st.scheduling_policies
752 .values()
753 .filter_map(|p| p.get("arn").map(|a| json!({ "arn": a })))
754 .collect()
755 })
756 .unwrap_or_default();
757 Ok(AwsResponse::ok_json(json!({ "schedulingPolicies": items })))
758 }
759
760 fn update_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
761 let body = req.json_body();
762 let name = arn_or_name(&body, "arn")?;
763 let mut accounts = self.state.write();
764 let st = accounts.get_or_create(&req.account_id);
765 let p = st
766 .scheduling_policies
767 .get_mut(&name)
768 .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
769 merge_updates(p, &body, &["fairsharePolicy"], "arn");
770 Ok(AwsResponse::ok_json(json!({})))
771 }
772
773 fn delete_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
774 let body = req.json_body();
775 let name = arn_or_name(&body, "arn")?;
776 let mut accounts = self.state.write();
777 accounts
778 .get_or_create(&req.account_id)
779 .scheduling_policies
780 .remove(&name);
781 Ok(AwsResponse::ok_json(json!({})))
782 }
783
784 async fn submit_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
787 let body = req.json_body();
788 let job_name = body
789 .get("jobName")
790 .and_then(Value::as_str)
791 .ok_or_else(|| client_error("ClientException", "jobName is required"))?
792 .to_string();
793 let job_queue = body
794 .get("jobQueue")
795 .and_then(Value::as_str)
796 .ok_or_else(|| client_error("ClientException", "jobQueue is required"))?
797 .to_string();
798 {
800 let queue_name = job_queue.rsplit('/').next().unwrap_or(&job_queue);
801 let accounts = self.state.read();
802 let exists = accounts.get(&req.account_id).is_some_and(|st| {
803 st.job_queues.contains_key(queue_name)
804 || st.job_queues.values().any(|q| {
805 q.get("jobQueueArn").and_then(Value::as_str) == Some(job_queue.as_str())
806 })
807 });
808 if !exists {
809 return Err(client_error(
810 "ClientException",
811 format!("Job queue {job_queue} does not exist"),
812 ));
813 }
814 }
815 let job_definition = body
816 .get("jobDefinition")
817 .and_then(Value::as_str)
818 .ok_or_else(|| client_error("ClientException", "jobDefinition is required"))?
819 .to_string();
820 let job_id = Uuid::new_v4().to_string();
821 let arn = self.arn(&req.account_id, &req.region, &format!("job/{job_id}"));
822 let now = chrono::Utc::now().timestamp_millis();
823 let array_size = match body
827 .pointer("/arrayProperties/size")
828 .and_then(Value::as_i64)
829 {
830 Some(n) if (2..=10_000).contains(&n) => Some(n),
831 Some(n) => {
832 return Err(client_error(
833 "ClientException",
834 format!("Array job size must be between 2 and 10000, but was {n}"),
835 ));
836 }
837 None => None,
838 };
839 let depends_on: Vec<String> = body
840 .get("dependsOn")
841 .and_then(Value::as_array)
842 .map(|a| {
843 a.iter()
844 .filter_map(|d| d.get("jobId").and_then(Value::as_str).map(String::from))
845 .collect()
846 })
847 .unwrap_or_default();
848
849 let mut job = obj(&body);
850 job.insert("jobId".into(), json!(job_id));
851 job.insert("jobArn".into(), json!(arn));
852 job.insert("jobName".into(), json!(job_name));
853 job.insert("jobQueue".into(), json!(job_queue));
854 job.insert("jobDefinition".into(), json!(job_definition));
855 job.insert("createdAt".into(), json!(now));
856
857 let container = self.resolve_container(&req.account_id, &job_definition, &body);
860
861 if let Some(size) = array_size {
862 job.insert("status".into(), json!("PENDING"));
867 job.insert("arrayProperties".into(), json!({ "size": size }));
868 {
869 let mut accounts = self.state.write();
870 accounts
871 .get_or_create(&req.account_id)
872 .jobs
873 .insert(job_id.clone(), Value::Object(job));
874 }
875 for index in 0..size {
876 let child_id = format!("{job_id}:{index}");
877 let child_arn = self.arn(&req.account_id, &req.region, &format!("job/{child_id}"));
878 let mut child = serde_json::Map::new();
879 child.insert("jobId".into(), json!(child_id));
880 child.insert("jobArn".into(), json!(child_arn));
881 child.insert("jobName".into(), json!(job_name));
882 child.insert("jobQueue".into(), json!(job_queue));
883 child.insert("jobDefinition".into(), json!(job_definition));
884 child.insert("status".into(), json!("SUBMITTED"));
885 child.insert("createdAt".into(), json!(now));
886 child.insert(
887 "arrayProperties".into(),
888 json!({ "index": index, "statusSummary": {} }),
889 );
890 {
891 let mut accounts = self.state.write();
892 accounts
893 .get_or_create(&req.account_id)
894 .jobs
895 .insert(child_id.clone(), Value::Object(child));
896 }
897 let child_container = container.clone().map(|c| with_array_index_env(c, index));
898 self.launch_job(req, &child_id, &job_name, child_container, now)
899 .await;
900 }
901 } else if !depends_on.is_empty() {
902 job.insert("status".into(), json!("PENDING"));
906 {
907 let mut accounts = self.state.write();
908 accounts
909 .get_or_create(&req.account_id)
910 .jobs
911 .insert(job_id.clone(), Value::Object(job));
912 }
913 spawn_dependency_waiter(
914 self.launch_ctx(),
915 req.account_id.clone(),
916 req.region.clone(),
917 req.request_id.clone(),
918 job_id.clone(),
919 job_name.clone(),
920 container,
921 depends_on,
922 now,
923 );
924 } else {
925 job.insert("status".into(), json!("SUBMITTED"));
926 {
927 let mut accounts = self.state.write();
928 accounts
929 .get_or_create(&req.account_id)
930 .jobs
931 .insert(job_id.clone(), Value::Object(job));
932 }
933 self.launch_job(req, &job_id, &job_name, container, now)
934 .await;
935 }
936
937 Ok(AwsResponse::ok_json(json!({
938 "jobArn": arn,
939 "jobName": job_name,
940 "jobId": job_id,
941 })))
942 }
943
944 fn launch_ctx(&self) -> LaunchCtx {
951 LaunchCtx {
952 batch_state: self.state.clone(),
953 ecs_state: self.ecs_state.clone(),
954 ecs_runtime: self.ecs_runtime.clone(),
955 snapshot_store: self.snapshot_store.clone(),
956 snapshot_lock: self.snapshot_lock.clone(),
957 }
958 }
959
960 async fn launch_job(
961 &self,
962 req: &AwsRequest,
963 job_id: &str,
964 job_name: &str,
965 container: Option<Value>,
966 now: i64,
967 ) {
968 launch(
969 &self.launch_ctx(),
970 &req.account_id,
971 &req.region,
972 &req.request_id,
973 job_id,
974 job_name,
975 container,
976 now,
977 )
978 .await;
979 }
980
981 fn resolve_container(
985 &self,
986 account_id: &str,
987 job_definition: &str,
988 submit_body: &Value,
989 ) -> Option<Value> {
990 let key = job_definition.rsplit('/').next().unwrap_or(job_definition);
991 let accounts = self.state.read();
992 let st = accounts.get(account_id)?;
993 let jd = if key.contains(':') {
995 st.job_definitions.get(key).cloned()
996 } else {
997 st.job_definitions
998 .iter()
999 .filter(|(k, _)| k.rsplit_once(':').map(|(n, _)| n) == Some(key))
1000 .max_by_key(|(k, _)| {
1001 k.rsplit_once(':')
1002 .and_then(|(_, r)| r.parse::<i64>().ok())
1003 .unwrap_or(0)
1004 })
1005 .map(|(_, v)| v.clone())
1006 }?;
1007 let mut container = jd.get("containerProperties")?.as_object()?.clone();
1008
1009 if let Some(ov) = submit_body
1010 .get("containerOverrides")
1011 .and_then(Value::as_object)
1012 {
1013 for f in ["command", "environment", "resourceRequirements"] {
1014 if let Some(v) = ov.get(f) {
1015 container.insert(f.to_string(), v.clone());
1016 }
1017 }
1018 }
1019 Some(Value::Object(container))
1020 }
1021
1022 fn describe_jobs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1023 let body = req.json_body();
1024 let wanted = string_set(&body, "jobs");
1025 let accounts = self.state.read();
1026 let items: Vec<Value> = accounts
1027 .get(&req.account_id)
1028 .map(|st| {
1029 st.jobs
1030 .values()
1031 .filter(|j| {
1032 wanted.is_empty()
1033 || j.get("jobId")
1034 .and_then(Value::as_str)
1035 .map(|id| wanted.contains(id))
1036 .unwrap_or(false)
1037 })
1038 .map(|j| {
1039 let id = j.get("jobId").and_then(Value::as_str).unwrap_or("");
1043 let is_parent = j.pointer("/arrayProperties/size").is_some();
1044 if !is_parent {
1045 return j.clone();
1046 }
1047 let prefix = format!("{id}:");
1048 let children: Vec<&Value> = st
1049 .jobs
1050 .iter()
1051 .filter(|(k, _)| k.starts_with(&prefix))
1052 .map(|(_, v)| v)
1053 .collect();
1054 let (summary, status) = array_status_summary(&children);
1055 let mut out = j.clone();
1056 if let Some(o) = out.as_object_mut() {
1057 o.insert("status".into(), json!(status));
1058 if let Some(ap) =
1059 o.get_mut("arrayProperties").and_then(|v| v.as_object_mut())
1060 {
1061 ap.insert("statusSummary".into(), summary);
1062 }
1063 }
1064 out
1065 })
1066 .collect()
1067 })
1068 .unwrap_or_default();
1069 Ok(AwsResponse::ok_json(json!({ "jobs": items })))
1070 }
1071
1072 fn list_jobs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1073 let body = req.json_body();
1074 let queue = body.get("jobQueue").and_then(Value::as_str);
1075 let array_job_id = body.get("arrayJobId").and_then(Value::as_str);
1076 let multi_node_job_id = body.get("multiNodeJobId").and_then(Value::as_str);
1077 let selectors = [queue, array_job_id, multi_node_job_id]
1079 .iter()
1080 .filter(|s| s.is_some())
1081 .count();
1082 if selectors != 1 {
1083 return Err(client_error(
1084 "ClientException",
1085 "The ListJobs request must specify exactly one of jobQueue, arrayJobId, or multiNodeJobId",
1086 ));
1087 }
1088 let status = body
1090 .get("jobStatus")
1091 .and_then(Value::as_str)
1092 .unwrap_or("RUNNING")
1093 .to_string();
1094 let max_results = body
1095 .get("maxResults")
1096 .and_then(Value::as_i64)
1097 .filter(|n| *n > 0)
1098 .map(|n| n.min(100) as usize)
1099 .unwrap_or(100);
1100 let start: usize = body
1101 .get("nextToken")
1102 .and_then(Value::as_str)
1103 .and_then(|t| t.parse().ok())
1104 .unwrap_or(0);
1105
1106 let accounts = self.state.read();
1107 let mut matched: Vec<&Value> = accounts
1108 .get(&req.account_id)
1109 .map(|st| {
1110 st.jobs
1111 .values()
1112 .filter(|j| {
1113 let selector_ok = if let Some(q) = queue {
1114 j.get("jobQueue").and_then(Value::as_str) == Some(q)
1115 } else if let Some(a) = array_job_id {
1116 j.get("jobId")
1117 .and_then(Value::as_str)
1118 .is_some_and(|id| id.starts_with(&format!("{a}:")))
1119 } else if let Some(m) = multi_node_job_id {
1120 j.get("jobId")
1121 .and_then(Value::as_str)
1122 .is_some_and(|id| id.starts_with(&format!("{m}#")))
1123 } else {
1124 false
1125 };
1126 selector_ok
1127 && j.get("status").and_then(Value::as_str) == Some(status.as_str())
1128 })
1129 .collect()
1130 })
1131 .unwrap_or_default();
1132 matched.sort_by(|a, b| {
1134 let ka = a.get("createdAt").and_then(Value::as_i64).unwrap_or(0);
1135 let kb = b.get("createdAt").and_then(Value::as_i64).unwrap_or(0);
1136 kb.cmp(&ka).then_with(|| {
1137 a.get("jobId")
1138 .and_then(Value::as_str)
1139 .cmp(&b.get("jobId").and_then(Value::as_str))
1140 })
1141 });
1142 let total = matched.len();
1143 let items: Vec<Value> = matched
1144 .into_iter()
1145 .skip(start)
1146 .take(max_results)
1147 .map(job_summary)
1148 .collect();
1149 let mut resp = serde_json::Map::new();
1150 resp.insert("jobSummaryList".into(), Value::Array(items));
1151 let next = start + max_results;
1152 if next < total {
1153 resp.insert("nextToken".into(), json!(next.to_string()));
1154 }
1155 Ok(AwsResponse::ok_json(Value::Object(resp)))
1156 }
1157
1158 fn cancel_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159 self.stop_job(req, &["SUBMITTED", "PENDING", "RUNNABLE"], "CancelJob")
1160 }
1161
1162 fn terminate_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1163 self.stop_job(
1164 req,
1165 &["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"],
1166 "TerminateJob",
1167 )
1168 }
1169
1170 fn stop_job(
1171 &self,
1172 req: &AwsRequest,
1173 cancelable: &[&str],
1174 op: &str,
1175 ) -> Result<AwsResponse, AwsServiceError> {
1176 let body = req.json_body();
1177 let job_id = body
1178 .get("jobId")
1179 .and_then(Value::as_str)
1180 .ok_or_else(|| client_error("ClientException", "jobId is required"))?
1181 .to_string();
1182 let reason = body
1183 .get("reason")
1184 .and_then(Value::as_str)
1185 .unwrap_or(op)
1186 .to_string();
1187 let mut accounts = self.state.write();
1188 if let Some(job) = accounts
1189 .get_or_create(&req.account_id)
1190 .jobs
1191 .get_mut(&job_id)
1192 {
1193 if let Some(o) = job.as_object_mut() {
1194 let cur = o.get("status").and_then(Value::as_str).unwrap_or("");
1195 if cancelable.contains(&cur) {
1196 o.insert("status".into(), json!("FAILED"));
1197 o.insert("statusReason".into(), json!(reason));
1198 }
1199 }
1200 }
1201 Ok(AwsResponse::ok_json(json!({})))
1202 }
1203
1204 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1207 let arn = req
1208 .path_segments
1209 .get(2)
1210 .map(|s| percent_decode(s))
1211 .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1212 let body = req.json_body();
1213 let tags = body
1214 .get("tags")
1215 .and_then(Value::as_object)
1216 .cloned()
1217 .unwrap_or_default();
1218 let mut accounts = self.state.write();
1219 let entry = accounts
1220 .get_or_create(&req.account_id)
1221 .tags
1222 .entry(arn)
1223 .or_default();
1224 for (k, v) in tags {
1225 if let Some(s) = v.as_str() {
1226 entry.insert(k, s.to_string());
1227 }
1228 }
1229 Ok(AwsResponse::ok_json(json!({})))
1230 }
1231
1232 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1233 let arn = req
1234 .path_segments
1235 .get(2)
1236 .map(|s| percent_decode(s))
1237 .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1238 let keys: Vec<String> = req
1239 .query_params
1240 .iter()
1241 .filter(|(k, _)| k.as_str() == "tagKeys" || k.starts_with("tagKeys"))
1242 .map(|(_, v)| v.clone())
1243 .collect();
1244 let mut accounts = self.state.write();
1245 if let Some(entry) = accounts.get_or_create(&req.account_id).tags.get_mut(&arn) {
1246 for k in keys {
1247 entry.remove(&k);
1248 }
1249 }
1250 Ok(AwsResponse::ok_json(json!({})))
1251 }
1252
1253 fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1254 let arn = req
1255 .path_segments
1256 .get(2)
1257 .map(|s| percent_decode(s))
1258 .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1259 let accounts = self.state.read();
1260 let tags = accounts
1261 .get(&req.account_id)
1262 .and_then(|st| st.tags.get(&arn))
1263 .map(|m| {
1264 m.iter()
1265 .map(|(k, v)| (k.clone(), json!(v)))
1266 .collect::<Map<String, Value>>()
1267 })
1268 .unwrap_or_default();
1269 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
1270 }
1271}
1272
1273fn merge_updates(stored: &mut Value, body: &Value, fields: &[&str], arn_key: &str) -> String {
1276 if let Some(o) = stored.as_object_mut() {
1277 for f in fields {
1278 if let Some(v) = body.get(*f) {
1279 o.insert((*f).to_string(), v.clone());
1280 }
1281 }
1282 o.get(arn_key)
1283 .and_then(Value::as_str)
1284 .unwrap_or_default()
1285 .to_string()
1286 } else {
1287 String::new()
1288 }
1289}
1290
1291#[derive(Clone)]
1294struct LaunchCtx {
1295 batch_state: SharedBatchState,
1296 ecs_state: Option<fakecloud_ecs::SharedEcsState>,
1297 ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
1298 snapshot_store: Option<Arc<dyn SnapshotStore>>,
1299 snapshot_lock: Arc<AsyncMutex<()>>,
1300}
1301
1302#[allow(clippy::too_many_arguments)]
1307async fn launch(
1308 ctx: &LaunchCtx,
1309 account: &str,
1310 region: &str,
1311 request_id: &str,
1312 job_id: &str,
1313 job_name: &str,
1314 container: Option<Value>,
1315 now: i64,
1316) {
1317 let (Some(ecs_state), Some(container)) = (ctx.ecs_state.clone(), container) else {
1318 return;
1319 };
1320 if container.get("image").and_then(Value::as_str).is_none() {
1321 return;
1322 }
1323 let src = bare_request(account, region, request_id);
1324 match launch_ecs_task(
1325 &ecs_state,
1326 &ctx.ecs_runtime,
1327 &src,
1328 job_id,
1329 job_name,
1330 &container,
1331 )
1332 .await
1333 {
1334 Ok((cluster, task_arn)) => {
1335 {
1336 let mut accounts = ctx.batch_state.write();
1337 if let Some(j) = accounts
1338 .get_or_create(account)
1339 .jobs
1340 .get_mut(job_id)
1341 .and_then(|j| j.as_object_mut())
1342 {
1343 j.insert("status".into(), json!("STARTING"));
1344 j.insert("ecsCluster".into(), json!(cluster));
1345 j.insert("ecsTaskArn".into(), json!(task_arn));
1346 j.insert("startedAt".into(), json!(now));
1347 }
1348 }
1349 spawn_status_sync(
1350 ctx,
1351 ecs_state,
1352 account.to_string(),
1353 region.to_string(),
1354 request_id.to_string(),
1355 job_id.to_string(),
1356 job_name.to_string(),
1357 cluster,
1358 task_arn,
1359 container,
1360 );
1361 }
1362 Err(err) => {
1363 let mut accounts = ctx.batch_state.write();
1364 if let Some(j) = accounts
1365 .get_or_create(account)
1366 .jobs
1367 .get_mut(job_id)
1368 .and_then(|j| j.as_object_mut())
1369 {
1370 j.insert("status".into(), json!("FAILED"));
1371 j.insert("statusReason".into(), json!(err.message().to_string()));
1372 }
1373 }
1374 }
1375}
1376
1377#[allow(clippy::too_many_arguments)]
1382fn spawn_dependency_waiter(
1383 ctx: LaunchCtx,
1384 account: String,
1385 region: String,
1386 request_id: String,
1387 job_id: String,
1388 job_name: String,
1389 container: Option<Value>,
1390 depends_on: Vec<String>,
1391 now: i64,
1392) {
1393 tokio::spawn(async move {
1394 for _ in 0..1800u32 {
1395 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1396 let statuses: Vec<String> = {
1397 let accounts = ctx.batch_state.read();
1398 let jobs = accounts.get(&account).map(|s| &s.jobs);
1399 depends_on
1400 .iter()
1401 .map(|d| {
1402 jobs.and_then(|m| m.get(d))
1403 .and_then(|j| j.get("status").and_then(Value::as_str))
1404 .unwrap_or("")
1405 .to_string()
1406 })
1407 .collect()
1408 };
1409 if statuses.iter().any(|s| s == "FAILED") {
1410 {
1411 let mut accounts = ctx.batch_state.write();
1412 if let Some(j) = accounts
1413 .get_or_create(&account)
1414 .jobs
1415 .get_mut(&job_id)
1416 .and_then(|j| j.as_object_mut())
1417 {
1418 j.insert("status".into(), json!("FAILED"));
1419 j.insert("statusReason".into(), json!("Dependent job failed"));
1420 }
1421 }
1422 save_snapshot_now(&ctx.batch_state, &ctx.snapshot_store, &ctx.snapshot_lock).await;
1423 return;
1424 }
1425 if statuses.iter().all(|s| s == "SUCCEEDED") {
1426 launch(
1427 &ctx,
1428 &account,
1429 ®ion,
1430 &request_id,
1431 &job_id,
1432 &job_name,
1433 container,
1434 now,
1435 )
1436 .await;
1437 return;
1438 }
1439 }
1440 });
1441}
1442
1443async fn launch_ecs_task(
1448 ecs_state: &fakecloud_ecs::SharedEcsState,
1449 ecs_runtime: &Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
1450 src: &AwsRequest,
1451 job_id: &str,
1452 job_name: &str,
1453 container: &Value,
1454) -> Result<(String, String), AwsServiceError> {
1455 let mut ecs = fakecloud_ecs::EcsService::new(ecs_state.clone());
1456 if let Some(rt) = ecs_runtime.clone() {
1457 ecs = ecs.with_runtime(rt);
1458 }
1459 let cluster = "fakecloud-batch".to_string();
1460 let _ = ecs
1461 .handle(ecs_request(
1462 "CreateCluster",
1463 json!({ "clusterName": cluster }),
1464 src,
1465 ))
1466 .await;
1467
1468 let image = container.get("image").and_then(Value::as_str).unwrap_or("");
1469 let (vcpus, memory) = container_resources(container);
1470 let mut cdef = serde_json::Map::new();
1471 cdef.insert("name".into(), json!("default"));
1472 cdef.insert("image".into(), json!(image));
1473 cdef.insert("essential".into(), json!(true));
1474 cdef.insert("cpu".into(), json!((vcpus * 1024.0).round() as i64));
1475 cdef.insert("memory".into(), json!(memory));
1476 if let Some(cmd) = container.get("command").filter(|v| v.is_array()) {
1477 cdef.insert("command".into(), cmd.clone());
1478 }
1479 if let Some(env) = container.get("environment").filter(|v| v.is_array()) {
1480 cdef.insert("environment".into(), env.clone());
1481 }
1482 let family = format!("batch-{job_name}");
1483 let reg = ecs
1484 .handle(ecs_request(
1485 "RegisterTaskDefinition",
1486 json!({
1487 "family": family,
1488 "containerDefinitions": [Value::Object(cdef)],
1489 "networkMode": "bridge",
1490 "requiresCompatibilities": ["EC2"],
1491 }),
1492 src,
1493 ))
1494 .await?;
1495 let reg_body: Value = parse_body(®);
1496 let task_def_arn = reg_body
1497 .pointer("/taskDefinition/taskDefinitionArn")
1498 .and_then(Value::as_str)
1499 .map(String::from)
1500 .unwrap_or(family);
1501
1502 let run = ecs
1503 .handle(ecs_request(
1504 "RunTask",
1505 json!({
1506 "cluster": cluster,
1507 "taskDefinition": task_def_arn,
1508 "count": 1,
1509 "launchType": "EC2",
1510 "startedBy": format!("batch:{job_id}"),
1511 }),
1512 src,
1513 ))
1514 .await?;
1515 let run_body: Value = parse_body(&run);
1516 let task_arn = run_body
1517 .pointer("/tasks/0/taskArn")
1518 .and_then(Value::as_str)
1519 .map(String::from)
1520 .ok_or_else(|| client_error("ServerException", "RunTask returned no task"))?;
1521 Ok((cluster, task_arn))
1522}
1523
1524#[allow(clippy::too_many_arguments)]
1530fn spawn_status_sync(
1531 ctx: &LaunchCtx,
1532 ecs_state: fakecloud_ecs::SharedEcsState,
1533 account_id: String,
1534 region: String,
1535 request_id: String,
1536 job_id: String,
1537 job_name: String,
1538 cluster: String,
1539 task_arn: String,
1540 container: Value,
1541) {
1542 let batch_state = ctx.batch_state.clone();
1543 let snapshot_store = ctx.snapshot_store.clone();
1544 let snapshot_lock = ctx.snapshot_lock.clone();
1545 let ecs_runtime = ctx.ecs_runtime.clone();
1546 tokio::spawn(async move {
1547 let ecs = fakecloud_ecs::EcsService::new(ecs_state.clone());
1548 let src = bare_request(&account_id, ®ion, &request_id);
1549 let (max_attempts, timeout_secs) = {
1551 let accounts = batch_state.read();
1552 let j = accounts.get(&account_id).and_then(|s| s.jobs.get(&job_id));
1553 let ma = j
1554 .and_then(|j| j.pointer("/retryStrategy/attempts"))
1555 .and_then(Value::as_i64)
1556 .unwrap_or(1)
1557 .clamp(1, 10);
1558 let to = j
1559 .and_then(|j| j.pointer("/timeout/attemptDurationSeconds"))
1560 .and_then(Value::as_i64)
1561 .filter(|t| *t > 0);
1562 (ma, to)
1563 };
1564 let max_polls = timeout_secs.unwrap_or(900).min(900) as u32;
1565 let mut task = task_arn;
1566 let mut attempt: i64 = 1;
1567 loop {
1568 let mut outcome: Option<(Option<i64>, Option<String>)> = None; let mut succeeded = false;
1571 for _ in 0..max_polls {
1572 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1573 let resp = match ecs
1574 .handle(ecs_request(
1575 "DescribeTasks",
1576 json!({ "cluster": cluster, "tasks": [task] }),
1577 &src,
1578 ))
1579 .await
1580 {
1581 Ok(r) => r,
1582 Err(_) => continue,
1583 };
1584 let body = parse_body(&resp);
1585 let Some(t) = body.pointer("/tasks/0") else {
1586 continue;
1587 };
1588 match t.get("lastStatus").and_then(Value::as_str).unwrap_or("") {
1589 "RUNNING" => {
1590 let mut accounts = batch_state.write();
1591 if let Some(j) = accounts
1592 .get_or_create(&account_id)
1593 .jobs
1594 .get_mut(&job_id)
1595 .and_then(|j| j.as_object_mut())
1596 {
1597 if j.get("status").and_then(Value::as_str) != Some("RUNNING") {
1598 j.insert("status".into(), json!("RUNNING"));
1599 }
1600 }
1601 }
1602 "STOPPED" => {
1603 let code = t.pointer("/containers/0/exitCode").and_then(Value::as_i64);
1604 let reason = t
1605 .get("stoppedReason")
1606 .and_then(Value::as_str)
1607 .map(String::from);
1608 if code == Some(0) {
1609 succeeded = true;
1610 } else {
1611 outcome =
1612 Some((code, reason.or(Some("Essential container exited".into()))));
1613 }
1614 break;
1615 }
1616 _ => continue,
1617 }
1618 }
1619
1620 if succeeded {
1621 set_job_terminal(
1622 &batch_state,
1623 &account_id,
1624 &job_id,
1625 "SUCCEEDED",
1626 Some(0),
1627 None,
1628 );
1629 save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1630 break;
1631 }
1632
1633 let (exit_code, reason) =
1635 outcome.unwrap_or((None, Some("Job attempt duration exceeded timeout".into())));
1636 if attempt < max_attempts {
1637 {
1639 let mut accounts = batch_state.write();
1640 if let Some(j) = accounts
1641 .get_or_create(&account_id)
1642 .jobs
1643 .get_mut(&job_id)
1644 .and_then(|j| j.as_object_mut())
1645 {
1646 let attempts = j.entry("attempts".to_string()).or_insert_with(|| json!([]));
1647 if let Some(a) = attempts.as_array_mut() {
1648 a.push(json!({
1649 "exitCode": exit_code,
1650 "statusReason": reason,
1651 }));
1652 }
1653 j.insert("status".into(), json!("RUNNABLE"));
1654 }
1655 }
1656 save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1657 match launch_ecs_task(
1658 &ecs_state,
1659 &ecs_runtime,
1660 &src,
1661 &job_id,
1662 &job_name,
1663 &container,
1664 )
1665 .await
1666 {
1667 Ok((_, new_task)) => {
1668 task = new_task;
1669 attempt += 1;
1670 let mut accounts = batch_state.write();
1671 if let Some(j) = accounts
1672 .get_or_create(&account_id)
1673 .jobs
1674 .get_mut(&job_id)
1675 .and_then(|j| j.as_object_mut())
1676 {
1677 j.insert("status".into(), json!("STARTING"));
1678 j.insert("ecsTaskArn".into(), json!(task));
1679 }
1680 continue;
1681 }
1682 Err(_) => {
1683 set_job_terminal(
1684 &batch_state,
1685 &account_id,
1686 &job_id,
1687 "FAILED",
1688 exit_code,
1689 reason,
1690 );
1691 save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1692 break;
1693 }
1694 }
1695 } else {
1696 set_job_terminal(
1697 &batch_state,
1698 &account_id,
1699 &job_id,
1700 "FAILED",
1701 exit_code,
1702 reason,
1703 );
1704 save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1705 break;
1706 }
1707 }
1708 });
1709}
1710
1711fn set_job_terminal(
1713 batch_state: &SharedBatchState,
1714 account_id: &str,
1715 job_id: &str,
1716 status: &str,
1717 exit_code: Option<i64>,
1718 reason: Option<String>,
1719) {
1720 let mut accounts = batch_state.write();
1721 if let Some(j) = accounts
1722 .get_or_create(account_id)
1723 .jobs
1724 .get_mut(job_id)
1725 .and_then(|j| j.as_object_mut())
1726 {
1727 j.insert("status".into(), json!(status));
1728 if let Some(c) = exit_code {
1729 let container = j
1730 .entry("container".to_string())
1731 .or_insert_with(|| json!({}));
1732 if let Some(o) = container.as_object_mut() {
1733 o.insert("exitCode".into(), json!(c));
1734 }
1735 }
1736 if let Some(r) = reason {
1737 j.insert("statusReason".into(), json!(r.clone()));
1738 if let Some(o) = j.get_mut("container").and_then(|v| v.as_object_mut()) {
1739 o.insert("reason".into(), json!(r));
1740 }
1741 }
1742 j.insert(
1743 "stoppedAt".into(),
1744 json!(chrono::Utc::now().timestamp_millis()),
1745 );
1746 }
1747}
1748
1749fn ecs_request(action: &str, body: Value, src: &AwsRequest) -> AwsRequest {
1752 AwsRequest {
1753 service: "ecs".to_string(),
1754 action: action.to_string(),
1755 region: src.region.clone(),
1756 account_id: src.account_id.clone(),
1757 request_id: src.request_id.clone(),
1758 headers: http::HeaderMap::new(),
1759 query_params: std::collections::HashMap::new(),
1760 body: bytes::Bytes::from(serde_json::to_vec(&body).unwrap_or_default()),
1761 body_stream: parking_lot::Mutex::new(None),
1762 path_segments: Vec::new(),
1763 raw_path: "/".to_string(),
1764 raw_query: String::new(),
1765 method: Method::POST,
1766 is_query_protocol: false,
1767 access_key_id: None,
1768 principal: None,
1769 }
1770}
1771
1772fn bare_request(account_id: &str, region: &str, request_id: &str) -> AwsRequest {
1775 AwsRequest {
1776 service: "batch".to_string(),
1777 action: String::new(),
1778 region: region.to_string(),
1779 account_id: account_id.to_string(),
1780 request_id: request_id.to_string(),
1781 headers: http::HeaderMap::new(),
1782 query_params: std::collections::HashMap::new(),
1783 body: bytes::Bytes::new(),
1784 body_stream: parking_lot::Mutex::new(None),
1785 path_segments: Vec::new(),
1786 raw_path: "/".to_string(),
1787 raw_query: String::new(),
1788 method: Method::POST,
1789 is_query_protocol: false,
1790 access_key_id: None,
1791 principal: None,
1792 }
1793}
1794
1795fn parse_body(resp: &AwsResponse) -> Value {
1797 serde_json::from_slice(resp.body.expect_bytes()).unwrap_or(Value::Null)
1798}
1799
1800fn container_resources(container: &Value) -> (f64, i64) {
1803 let mut vcpus = container
1804 .get("vcpus")
1805 .and_then(Value::as_f64)
1806 .unwrap_or(1.0);
1807 let mut memory = container
1808 .get("memory")
1809 .and_then(Value::as_i64)
1810 .unwrap_or(512);
1811 if let Some(rr) = container
1812 .get("resourceRequirements")
1813 .and_then(Value::as_array)
1814 {
1815 for r in rr {
1816 let ty = r.get("type").and_then(Value::as_str).unwrap_or("");
1817 let val = r
1818 .get("value")
1819 .and_then(Value::as_str)
1820 .and_then(|s| s.parse::<f64>().ok());
1821 match (ty, val) {
1822 ("VCPU", Some(v)) => vcpus = v,
1823 ("MEMORY", Some(v)) => memory = v as i64,
1824 _ => {}
1825 }
1826 }
1827 }
1828 (vcpus.max(0.25), memory.max(4))
1829}
1830
1831async fn save_snapshot_now(
1834 state: &SharedBatchState,
1835 store: &Option<Arc<dyn SnapshotStore>>,
1836 lock: &Arc<AsyncMutex<()>>,
1837) {
1838 let Some(store) = store.clone() else {
1839 return;
1840 };
1841 let _guard = lock.lock().await;
1842 let bytes = {
1843 let snap = BatchSnapshot {
1844 schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
1845 accounts: Some(state.read().clone()),
1846 };
1847 serde_json::to_vec(&snap).unwrap_or_default()
1848 };
1849 let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
1850}
1851
1852fn with_array_index_env(mut container: Value, index: i64) -> Value {
1855 if let Some(obj) = container.as_object_mut() {
1856 let env = obj
1857 .entry("environment".to_string())
1858 .or_insert_with(|| json!([]));
1859 if let Some(arr) = env.as_array_mut() {
1860 arr.retain(|e| {
1861 e.get("name").and_then(Value::as_str) != Some("AWS_BATCH_JOB_ARRAY_INDEX")
1862 });
1863 arr.push(json!({ "name": "AWS_BATCH_JOB_ARRAY_INDEX", "value": index.to_string() }));
1864 }
1865 }
1866 container
1867}
1868
1869fn array_status_summary(children: &[&Value]) -> (Value, &'static str) {
1872 let mut summary = serde_json::Map::new();
1873 for s in [
1874 "SUBMITTED",
1875 "PENDING",
1876 "RUNNABLE",
1877 "STARTING",
1878 "RUNNING",
1879 "SUCCEEDED",
1880 "FAILED",
1881 ] {
1882 let n = children
1883 .iter()
1884 .filter(|c| c.get("status").and_then(Value::as_str) == Some(s))
1885 .count();
1886 summary.insert(s.to_string(), json!(n));
1887 }
1888 let total = children.len();
1889 let succeeded = summary["SUCCEEDED"].as_u64().unwrap_or(0) as usize;
1890 let failed = summary["FAILED"].as_u64().unwrap_or(0) as usize;
1891 let status = if total > 0 && succeeded + failed == total {
1892 if failed > 0 {
1893 "FAILED"
1894 } else {
1895 "SUCCEEDED"
1896 }
1897 } else if summary["RUNNING"].as_u64().unwrap_or(0) > 0
1898 || summary["STARTING"].as_u64().unwrap_or(0) > 0
1899 {
1900 "RUNNING"
1901 } else {
1902 "PENDING"
1903 };
1904 (Value::Object(summary), status)
1905}
1906
1907fn string_set(body: &Value, key: &str) -> std::collections::HashSet<String> {
1909 body.get(key)
1910 .and_then(Value::as_array)
1911 .map(|a| {
1912 a.iter()
1913 .filter_map(|v| v.as_str().map(String::from))
1914 .collect()
1915 })
1916 .unwrap_or_default()
1917}
1918
1919fn match_named(
1921 res: &Value,
1922 wanted: &std::collections::HashSet<String>,
1923 name_key: &str,
1924 arn_key: &str,
1925) -> bool {
1926 if wanted.is_empty() {
1927 return true;
1928 }
1929 let name = res.get(name_key).and_then(Value::as_str);
1930 let arn = res.get(arn_key).and_then(Value::as_str);
1931 name.map(|n| wanted.contains(n)).unwrap_or(false)
1932 || arn.map(|a| wanted.contains(a)).unwrap_or(false)
1933}
1934
1935fn arn_or_name(body: &Value, key: &str) -> Result<String, AwsServiceError> {
1938 let raw = body
1939 .get(key)
1940 .and_then(Value::as_str)
1941 .ok_or_else(|| client_error("ClientException", format!("{key} is required")))?;
1942 Ok(raw.rsplit('/').next().unwrap_or(raw).to_string())
1943}
1944
1945fn hex_val(b: u8) -> Option<u8> {
1946 match b {
1947 b'0'..=b'9' => Some(b - b'0'),
1948 b'a'..=b'f' => Some(b - b'a' + 10),
1949 b'A'..=b'F' => Some(b - b'A' + 10),
1950 _ => None,
1951 }
1952}
1953
1954fn percent_decode(s: &str) -> String {
1955 let bytes = s.as_bytes();
1960 let mut out = Vec::with_capacity(bytes.len());
1961 let mut i = 0;
1962 while i < bytes.len() {
1963 if bytes[i] == b'%' && i + 2 < bytes.len() {
1964 if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
1965 out.push(hi * 16 + lo);
1966 i += 3;
1967 continue;
1968 }
1969 }
1970 out.push(bytes[i]);
1971 i += 1;
1972 }
1973 String::from_utf8_lossy(&out).into_owned()
1974}
1975
1976#[async_trait]
1977impl AwsService for BatchService {
1978 fn service_name(&self) -> &str {
1979 "batch"
1980 }
1981
1982 fn supported_actions(&self) -> &[&str] {
1983 SUPPORTED_ACTIONS
1984 }
1985
1986 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1987 let Some(action) = Self::resolve_action(&req) else {
1988 return Err(AwsServiceError::aws_error(
1989 StatusCode::NOT_FOUND,
1990 "ResourceNotFoundException",
1991 format!("Unknown operation: {} {}", req.method, req.raw_path),
1992 ));
1993 };
1994 let result = if action == "SubmitJob" {
1997 self.submit_job(&req).await
1998 } else {
1999 self.dispatch(action, &req)
2000 };
2001 if MUTATING_ACTIONS.contains(&action)
2002 && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
2003 {
2004 self.save_snapshot().await;
2005 }
2006 result
2007 }
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012 use super::*;
2013 use crate::state::BatchAccounts;
2014 use parking_lot::RwLock;
2015 use std::collections::HashMap;
2016
2017 fn svc() -> BatchService {
2018 BatchService::new(Arc::new(RwLock::new(BatchAccounts::new())))
2019 }
2020
2021 fn req(path: &str, body: Value) -> AwsRequest {
2022 let p = path.split('?').next().unwrap_or(path);
2023 let path_segments: Vec<String> = p
2024 .split('/')
2025 .filter(|s| !s.is_empty())
2026 .map(String::from)
2027 .collect();
2028 AwsRequest {
2029 service: "batch".into(),
2030 action: String::new(),
2031 region: "us-east-1".into(),
2032 account_id: "123456789012".into(),
2033 request_id: "t".into(),
2034 headers: http::HeaderMap::new(),
2035 query_params: HashMap::new(),
2036 body: bytes::Bytes::from(serde_json::to_vec(&body).unwrap()),
2037 body_stream: parking_lot::Mutex::new(None),
2038 path_segments,
2039 raw_path: path.to_string(),
2040 raw_query: String::new(),
2041 method: Method::POST,
2042 is_query_protocol: false,
2043 access_key_id: None,
2044 principal: None,
2045 }
2046 }
2047
2048 fn body_of(r: AwsResponse) -> Value {
2049 serde_json::from_slice(r.body.expect_bytes()).unwrap()
2050 }
2051
2052 async fn mk_queue(s: &BatchService, name: &str) {
2053 s.handle(req(
2054 "/v1/createjobqueue",
2055 json!({"jobQueueName": name, "priority": 1}),
2056 ))
2057 .await
2058 .unwrap();
2059 }
2060
2061 #[tokio::test]
2062 async fn compute_environment_lifecycle() {
2063 let s = svc();
2064 let r = s
2065 .handle(req(
2066 "/v1/createcomputeenvironment",
2067 json!({"computeEnvironmentName": "ce1", "type": "MANAGED"}),
2068 ))
2069 .await
2070 .unwrap();
2071 let v = body_of(r);
2072 assert_eq!(v["computeEnvironmentName"], "ce1");
2073 assert!(v["computeEnvironmentArn"]
2074 .as_str()
2075 .unwrap()
2076 .contains("compute-environment/ce1"));
2077
2078 let d = body_of(
2079 s.handle(req("/v1/describecomputeenvironments", json!({})))
2080 .await
2081 .unwrap(),
2082 );
2083 let ces = d["computeEnvironments"].as_array().unwrap();
2084 assert_eq!(ces.len(), 1);
2085 assert_eq!(ces[0]["status"], "VALID");
2086 assert_eq!(ces[0]["state"], "ENABLED");
2087
2088 s.handle(req(
2089 "/v1/deletecomputeenvironment",
2090 json!({"computeEnvironment": "ce1"}),
2091 ))
2092 .await
2093 .unwrap();
2094 let d2 = body_of(
2095 s.handle(req("/v1/describecomputeenvironments", json!({})))
2096 .await
2097 .unwrap(),
2098 );
2099 assert_eq!(d2["computeEnvironments"].as_array().unwrap().len(), 0);
2100 }
2101
2102 #[tokio::test]
2103 async fn job_definition_revisions_increment() {
2104 let s = svc();
2105 for expected in 1..=3 {
2106 let v = body_of(
2107 s.handle(req(
2108 "/v1/registerjobdefinition",
2109 json!({"jobDefinitionName": "jd", "type": "container"}),
2110 ))
2111 .await
2112 .unwrap(),
2113 );
2114 assert_eq!(v["revision"], expected);
2115 }
2116 let d = body_of(
2117 s.handle(req(
2118 "/v1/describejobdefinitions",
2119 json!({"jobDefinitionName": "jd"}),
2120 ))
2121 .await
2122 .unwrap(),
2123 );
2124 assert_eq!(d["jobDefinitions"].as_array().unwrap().len(), 3);
2125 }
2126
2127 #[tokio::test]
2128 async fn unimplemented_op_errors_not_fakes() {
2129 let s = svc();
2130 let err = match s
2133 .handle(req(
2134 "/v1/createconsumableresource",
2135 json!({"consumableResourceName": "r"}),
2136 ))
2137 .await
2138 {
2139 Err(e) => e,
2140 Ok(_) => panic!("unimplemented op must not fake-succeed"),
2141 };
2142 assert_eq!(err.status(), StatusCode::NOT_IMPLEMENTED);
2143 }
2144
2145 #[tokio::test]
2146 async fn tags_agree_across_describe_and_list_tags() {
2147 let s = svc();
2148 let c = body_of(
2150 s.handle(req(
2151 "/v1/createcomputeenvironment",
2152 json!({"computeEnvironmentName": "ce", "type": "MANAGED",
2153 "tags": {"team": "data"}}),
2154 ))
2155 .await
2156 .unwrap(),
2157 );
2158 let arn = c["computeEnvironmentArn"].as_str().unwrap().to_string();
2159 let enc = arn.replace('/', "%2F");
2160
2161 let mut lt = req(&format!("/v1/tags/{enc}"), json!({}));
2164 lt.method = Method::GET;
2165 let tags = body_of(s.handle(lt).await.unwrap());
2166 assert_eq!(tags["tags"]["team"], "data");
2167
2168 let mut tr = req(&format!("/v1/tags/{enc}"), json!({"tags": {"env": "prod"}}));
2170 tr.method = Method::POST;
2171 s.handle(tr).await.unwrap();
2172 let d = body_of(
2173 s.handle(req("/v1/describecomputeenvironments", json!({})))
2174 .await
2175 .unwrap(),
2176 );
2177 let ce = &d["computeEnvironments"][0];
2178 assert_eq!(ce["tags"]["team"], "data");
2179 assert_eq!(ce["tags"]["env"], "prod");
2180 assert_eq!(ce["containerOrchestrationType"], "ECS");
2182 }
2183
2184 #[tokio::test]
2185 async fn job_submit_describe_cancel_lifecycle() {
2186 let s = svc();
2187 mk_queue(&s, "q1").await;
2188 let sub = body_of(
2189 s.handle(req(
2190 "/v1/submitjob",
2191 json!({"jobName": "j1", "jobQueue": "q1", "jobDefinition": "jd:1"}),
2192 ))
2193 .await
2194 .unwrap(),
2195 );
2196 let job_id = sub["jobId"].as_str().unwrap().to_string();
2197 assert_eq!(sub["jobName"], "j1");
2198
2199 let d = body_of(
2200 s.handle(req("/v1/describejobs", json!({"jobs": [job_id]})))
2201 .await
2202 .unwrap(),
2203 );
2204 assert_eq!(d["jobs"][0]["status"], "SUBMITTED");
2205
2206 let running = body_of(
2208 s.handle(req("/v1/listjobs", json!({"jobQueue": "q1"})))
2209 .await
2210 .unwrap(),
2211 );
2212 assert_eq!(running["jobSummaryList"].as_array().unwrap().len(), 0);
2213 let l = body_of(
2216 s.handle(req(
2217 "/v1/listjobs",
2218 json!({"jobQueue": "q1", "jobStatus": "SUBMITTED"}),
2219 ))
2220 .await
2221 .unwrap(),
2222 );
2223 assert_eq!(l["jobSummaryList"].as_array().unwrap().len(), 1);
2224 assert!(l["jobSummaryList"][0]["jobArn"].as_str().is_some());
2225
2226 match s.handle(req("/v1/listjobs", json!({}))).await {
2228 Err(e) => assert!(format!("{e:?}").contains("exactly one")),
2229 Ok(_) => panic!("ListJobs without a selector must be rejected"),
2230 }
2231
2232 s.handle(req(
2233 "/v1/canceljob",
2234 json!({"jobId": job_id, "reason": "stop it"}),
2235 ))
2236 .await
2237 .unwrap();
2238 let d2 = body_of(
2239 s.handle(req("/v1/describejobs", json!({"jobs": [job_id]})))
2240 .await
2241 .unwrap(),
2242 );
2243 assert_eq!(d2["jobs"][0]["status"], "FAILED");
2244 assert_eq!(d2["jobs"][0]["statusReason"], "stop it");
2245 }
2246
2247 #[tokio::test]
2248 async fn scheduling_policy_crud() {
2249 let s = svc();
2250 let c = body_of(
2251 s.handle(req("/v1/createschedulingpolicy", json!({"name": "sp1"})))
2252 .await
2253 .unwrap(),
2254 );
2255 let arn = c["arn"].as_str().unwrap().to_string();
2256 assert!(arn.contains("scheduling-policy/sp1"));
2257 let d = body_of(
2258 s.handle(req(
2259 "/v1/describeschedulingpolicies",
2260 json!({"arns": [arn]}),
2261 ))
2262 .await
2263 .unwrap(),
2264 );
2265 assert_eq!(d["schedulingPolicies"].as_array().unwrap().len(), 1);
2266 s.handle(req("/v1/deleteschedulingpolicy", json!({"arn": "sp1"})))
2267 .await
2268 .unwrap();
2269 let l = body_of(
2270 s.handle(req("/v1/listschedulingpolicies", json!({})))
2271 .await
2272 .unwrap(),
2273 );
2274 assert_eq!(l["schedulingPolicies"].as_array().unwrap().len(), 0);
2275 }
2276
2277 #[tokio::test]
2278 async fn update_compute_environment_persists() {
2279 let s = svc();
2280 s.handle(req(
2281 "/v1/createcomputeenvironment",
2282 json!({"computeEnvironmentName": "ce1", "state": "ENABLED"}),
2283 ))
2284 .await
2285 .unwrap();
2286 s.handle(req(
2287 "/v1/updatecomputeenvironment",
2288 json!({"computeEnvironment": "ce1", "state": "DISABLED"}),
2289 ))
2290 .await
2291 .unwrap();
2292 let d = body_of(
2293 s.handle(req("/v1/describecomputeenvironments", json!({})))
2294 .await
2295 .unwrap(),
2296 );
2297 assert_eq!(d["computeEnvironments"][0]["state"], "DISABLED");
2298 }
2299
2300 #[test]
2301 fn container_resources_from_both_shapes() {
2302 let (v, m) = container_resources(&json!({"vcpus": 2, "memory": 2048}));
2304 assert_eq!(v, 2.0);
2305 assert_eq!(m, 2048);
2306 let (v, m) = container_resources(&json!({
2308 "resourceRequirements": [
2309 {"type": "VCPU", "value": "4"},
2310 {"type": "MEMORY", "value": "8192"}
2311 ]
2312 }));
2313 assert_eq!(v, 4.0);
2314 assert_eq!(m, 8192);
2315 let (v, m) = container_resources(&json!({}));
2317 assert_eq!(v, 1.0);
2318 assert_eq!(m, 512);
2319 }
2320
2321 #[tokio::test]
2322 async fn resolve_container_picks_latest_revision_and_overrides() {
2323 let s = svc();
2324 for cmd in [json!(["echo", "v1"]), json!(["echo", "v2"])] {
2325 s.handle(req(
2326 "/v1/registerjobdefinition",
2327 json!({
2328 "jobDefinitionName": "jd",
2329 "type": "container",
2330 "containerProperties": {"image": "alpine", "command": cmd}
2331 }),
2332 ))
2333 .await
2334 .unwrap();
2335 }
2336 let c = s
2338 .resolve_container("123456789012", "jd", &json!({}))
2339 .unwrap();
2340 assert_eq!(c["command"], json!(["echo", "v2"]));
2341 let c = s
2343 .resolve_container(
2344 "123456789012",
2345 "jd:1",
2346 &json!({"containerOverrides": {"command": ["overridden"]}}),
2347 )
2348 .unwrap();
2349 assert_eq!(c["command"], json!(["overridden"]));
2350 assert_eq!(c["image"], "alpine");
2351 }
2352
2353 #[tokio::test]
2354 async fn submit_without_ecs_parks_at_submitted_never_auto_succeeds() {
2355 let s = svc();
2358 mk_queue(&s, "q").await;
2359 s.handle(req(
2360 "/v1/registerjobdefinition",
2361 json!({"jobDefinitionName": "jd", "type": "container",
2362 "containerProperties": {"image": "alpine"}}),
2363 ))
2364 .await
2365 .unwrap();
2366 let sub = body_of(
2367 s.handle(req(
2368 "/v1/submitjob",
2369 json!({"jobName": "j", "jobQueue": "q", "jobDefinition": "jd"}),
2370 ))
2371 .await
2372 .unwrap(),
2373 );
2374 let id = sub["jobId"].as_str().unwrap().to_string();
2375 let d = body_of(
2376 s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2377 .await
2378 .unwrap(),
2379 );
2380 assert_eq!(d["jobs"][0]["status"], "SUBMITTED");
2381 }
2382
2383 #[tokio::test]
2384 async fn reconcile_fails_in_flight_jobs_after_restart() {
2385 let s = svc();
2389 mk_queue(&s, "q").await;
2390 s.handle(req(
2391 "/v1/registerjobdefinition",
2392 json!({"jobDefinitionName": "jd", "type": "container",
2393 "containerProperties": {"image": "alpine"}}),
2394 ))
2395 .await
2396 .unwrap();
2397 let sub = body_of(
2398 s.handle(req(
2399 "/v1/submitjob",
2400 json!({"jobName": "j", "jobQueue": "q", "jobDefinition": "jd"}),
2401 ))
2402 .await
2403 .unwrap(),
2404 );
2405 let id = sub["jobId"].as_str().unwrap().to_string();
2406
2407 s.reconcile_persisted_jobs().await;
2408
2409 let d = body_of(
2410 s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2411 .await
2412 .unwrap(),
2413 );
2414 assert_eq!(d["jobs"][0]["status"], "FAILED");
2415 assert_eq!(
2416 d["jobs"][0]["statusReason"],
2417 "Job interrupted by a fakecloud restart"
2418 );
2419 assert!(d["jobs"][0]["stoppedAt"].is_i64());
2420
2421 s.reconcile_persisted_jobs().await;
2423 let d2 = body_of(
2424 s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2425 .await
2426 .unwrap(),
2427 );
2428 assert_eq!(d2["jobs"][0]["status"], "FAILED");
2429 }
2430
2431 #[test]
2432 fn array_index_env_injected() {
2433 let c = with_array_index_env(json!({"image": "alpine"}), 5);
2434 let env = c["environment"].as_array().unwrap();
2435 assert!(env
2436 .iter()
2437 .any(|e| e["name"] == "AWS_BATCH_JOB_ARRAY_INDEX" && e["value"] == "5"));
2438 }
2439
2440 #[tokio::test]
2441 async fn array_job_spawns_children_and_parent_aggregates() {
2442 let s = svc();
2443 mk_queue(&s, "q").await;
2444 s.handle(req(
2445 "/v1/registerjobdefinition",
2446 json!({"jobDefinitionName": "jd", "type": "container",
2447 "containerProperties": {"image": "alpine"}}),
2448 ))
2449 .await
2450 .unwrap();
2451 let sub = body_of(
2452 s.handle(req(
2453 "/v1/submitjob",
2454 json!({"jobName": "arr", "jobQueue": "q", "jobDefinition": "jd",
2455 "arrayProperties": {"size": 3}}),
2456 ))
2457 .await
2458 .unwrap(),
2459 );
2460 let parent = sub["jobId"].as_str().unwrap().to_string();
2461
2462 let listed = body_of(
2465 s.handle(req(
2466 "/v1/listjobs",
2467 json!({"arrayJobId": parent, "jobStatus": "SUBMITTED"}),
2468 ))
2469 .await
2470 .unwrap(),
2471 );
2472 let children = listed["jobSummaryList"].as_array().unwrap();
2473 assert_eq!(children.len(), 3);
2474 assert!(children.iter().all(|j| j["jobId"]
2475 .as_str()
2476 .unwrap()
2477 .starts_with(&format!("{parent}:"))));
2478
2479 let d = body_of(
2481 s.handle(req("/v1/describejobs", json!({"jobs": [parent]})))
2482 .await
2483 .unwrap(),
2484 );
2485 let p = &d["jobs"][0];
2486 assert_eq!(p["status"], "PENDING");
2487 assert_eq!(p["arrayProperties"]["statusSummary"]["SUBMITTED"], 3);
2488 assert_eq!(p["arrayProperties"]["size"], 3);
2489 }
2490
2491 #[test]
2492 fn array_summary_terminal_states() {
2493 let succ = json!({"status": "SUCCEEDED"});
2494 let fail = json!({"status": "FAILED"});
2495 let run = json!({"status": "RUNNING"});
2496 let (_, st) = array_status_summary(&[&succ, &succ]);
2497 assert_eq!(st, "SUCCEEDED");
2498 let (_, st) = array_status_summary(&[&succ, &fail]);
2499 assert_eq!(st, "FAILED");
2500 let (_, st) = array_status_summary(&[&succ, &run]);
2501 assert_eq!(st, "RUNNING");
2502 }
2503
2504 #[tokio::test]
2505 async fn depends_on_parks_at_pending() {
2506 let s = svc();
2507 mk_queue(&s, "q").await;
2508 s.handle(req(
2509 "/v1/registerjobdefinition",
2510 json!({"jobDefinitionName": "jd", "type": "container",
2511 "containerProperties": {"image": "alpine"}}),
2512 ))
2513 .await
2514 .unwrap();
2515 let a = body_of(
2516 s.handle(req(
2517 "/v1/submitjob",
2518 json!({"jobName": "a", "jobQueue": "q", "jobDefinition": "jd"}),
2519 ))
2520 .await
2521 .unwrap(),
2522 );
2523 let a_id = a["jobId"].as_str().unwrap().to_string();
2524 let b = body_of(
2525 s.handle(req(
2526 "/v1/submitjob",
2527 json!({"jobName": "b", "jobQueue": "q", "jobDefinition": "jd",
2528 "dependsOn": [{"jobId": a_id, "type": "SEQUENTIAL"}]}),
2529 ))
2530 .await
2531 .unwrap(),
2532 );
2533 let d = body_of(
2536 s.handle(req(
2537 "/v1/describejobs",
2538 json!({"jobs": [b["jobId"].clone()]}),
2539 ))
2540 .await
2541 .unwrap(),
2542 );
2543 assert_eq!(d["jobs"][0]["status"], "PENDING");
2544 }
2545
2546 #[test]
2547 fn routes_tag_family_by_method() {
2548 let mut r = req("/v1/tags/arn%3Aaws", json!({}));
2549 r.method = Method::GET;
2550 assert_eq!(
2551 BatchService::resolve_action(&r),
2552 Some("ListTagsForResource")
2553 );
2554 r.method = Method::DELETE;
2555 assert_eq!(BatchService::resolve_action(&r), Some("UntagResource"));
2556 }
2557
2558 #[test]
2559 fn percent_decode_handles_multibyte_without_panicking() {
2560 assert_eq!(percent_decode("%€"), "%€");
2564 assert_eq!(percent_decode("%"), "%");
2566 assert_eq!(percent_decode("%2"), "%2");
2567 assert_eq!(percent_decode("%zz"), "%zz");
2568 assert_eq!(percent_decode("arn%3Aaws%3Abatch"), "arn:aws:batch");
2570 assert_eq!(percent_decode("caf%C3%A9"), "café");
2571 assert_eq!(percent_decode("plain"), "plain");
2572 }
2573
2574 #[tokio::test]
2575 async fn list_tags_with_multibyte_arn_does_not_panic() {
2576 let s = svc();
2577 let mut r = req("/v1/tags/%€", json!({}));
2580 r.method = Method::GET;
2581 let resp = s.handle(r).await.unwrap();
2582 let v = body_of(resp);
2583 assert!(v.get("tags").is_some());
2584 }
2585
2586 #[tokio::test]
2587 async fn submit_job_rejects_out_of_range_array_size() {
2588 let s = svc();
2589 mk_queue(&s, "q").await;
2590 for bad in [1_i64, 0, -3, 10_001, 2_000_000_000] {
2591 let res = s
2592 .handle(req(
2593 "/v1/submitjob",
2594 json!({
2595 "jobName": "j", "jobQueue": "q", "jobDefinition": "d",
2596 "arrayProperties": {"size": bad}
2597 }),
2598 ))
2599 .await;
2600 match res {
2601 Err(e) => assert!(
2602 format!("{e:?}").contains("between 2 and 10000"),
2603 "size {bad}: wrong error {e:?}"
2604 ),
2605 Ok(_) => panic!("size {bad}: out-of-range array size must be rejected"),
2606 }
2607 }
2608 }
2609}