Skip to main content

fakecloud_batch/
service.rs

1//! AWS Batch restJson1 service dispatch + core control plane.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use http::{Method, StatusCode};
7use serde_json::{json, Map, Value};
8use tokio::sync::Mutex as AsyncMutex;
9use uuid::Uuid;
10
11use fakecloud_aws::arn::Arn;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_persistence::{SnapshotHook, SnapshotStore};
14
15use crate::state::{BatchSnapshot, SharedBatchState, BATCH_SNAPSHOT_SCHEMA_VERSION};
16
17const SUPPORTED_ACTIONS: &[&str] = &[
18    "CancelJob",
19    "CreateComputeEnvironment",
20    "CreateConsumableResource",
21    "CreateJobQueue",
22    "CreateQuotaShare",
23    "CreateSchedulingPolicy",
24    "CreateServiceEnvironment",
25    "DeleteComputeEnvironment",
26    "DeleteConsumableResource",
27    "DeleteJobQueue",
28    "DeleteQuotaShare",
29    "DeleteSchedulingPolicy",
30    "DeleteServiceEnvironment",
31    "DeregisterJobDefinition",
32    "DescribeComputeEnvironments",
33    "DescribeConsumableResource",
34    "DescribeJobDefinitions",
35    "DescribeJobQueues",
36    "DescribeJobs",
37    "DescribeQuotaShare",
38    "DescribeSchedulingPolicies",
39    "DescribeServiceEnvironments",
40    "DescribeServiceJob",
41    "GetJobQueueSnapshot",
42    "ListConsumableResources",
43    "ListJobs",
44    "ListJobsByConsumableResource",
45    "ListQuotaShares",
46    "ListSchedulingPolicies",
47    "ListServiceJobs",
48    "ListTagsForResource",
49    "RegisterJobDefinition",
50    "SubmitJob",
51    "SubmitServiceJob",
52    "TagResource",
53    "TerminateJob",
54    "TerminateServiceJob",
55    "UntagResource",
56    "UpdateComputeEnvironment",
57    "UpdateConsumableResource",
58    "UpdateJobQueue",
59    "UpdateQuotaShare",
60    "UpdateSchedulingPolicy",
61    "UpdateServiceEnvironment",
62    "UpdateServiceJob",
63];
64
65/// Mutating actions trigger a snapshot write after success.
66const MUTATING_ACTIONS: &[&str] = &[
67    "CreateComputeEnvironment",
68    "UpdateComputeEnvironment",
69    "DeleteComputeEnvironment",
70    "CreateJobQueue",
71    "UpdateJobQueue",
72    "DeleteJobQueue",
73    "RegisterJobDefinition",
74    "DeregisterJobDefinition",
75    "CreateSchedulingPolicy",
76    "UpdateSchedulingPolicy",
77    "DeleteSchedulingPolicy",
78    "SubmitJob",
79    "CancelJob",
80    "TerminateJob",
81    "TagResource",
82    "UntagResource",
83];
84
85pub struct BatchService {
86    state: SharedBatchState,
87    snapshot_store: Option<Arc<dyn SnapshotStore>>,
88    snapshot_lock: Arc<AsyncMutex<()>>,
89    /// ECS backend so a submitted job runs as a REAL container (the wedge:
90    /// every rival fakes Batch compute). `None` parks jobs at SUBMITTED
91    /// honestly (unit tests / no container runtime), never auto-succeeds.
92    ecs_state: Option<fakecloud_ecs::SharedEcsState>,
93    ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
94}
95
96impl BatchService {
97    pub fn new(state: SharedBatchState) -> Self {
98        Self {
99            state,
100            snapshot_store: None,
101            snapshot_lock: Arc::new(AsyncMutex::new(())),
102            ecs_state: None,
103            ecs_runtime: None,
104        }
105    }
106
107    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
108        self.snapshot_store = Some(store);
109        self
110    }
111
112    /// Attach the ECS backend so SubmitJob launches a real container-backed
113    /// task and drives the job status off its real exit code.
114    pub fn with_ecs(
115        mut self,
116        state: fakecloud_ecs::SharedEcsState,
117        runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
118    ) -> Self {
119        self.ecs_state = Some(state);
120        self.ecs_runtime = runtime;
121        self
122    }
123
124    async fn save_snapshot(&self) {
125        let Some(store) = self.snapshot_store.clone() else {
126            return;
127        };
128        let _guard = self.snapshot_lock.lock().await;
129        let bytes = {
130            let snap = BatchSnapshot {
131                schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
132                accounts: Some(self.state.read().clone()),
133            };
134            serde_json::to_vec(&snap).unwrap_or_default()
135        };
136        let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
137    }
138
139    /// CloudFormation write-through hook (used once the CFN provisioner lands).
140    pub fn snapshot_hook(&self) -> Option<SnapshotHook> {
141        let store = self.snapshot_store.clone()?;
142        let state = self.state.clone();
143        let lock = self.snapshot_lock.clone();
144        Some(Arc::new(move || {
145            let store = store.clone();
146            let state = state.clone();
147            let lock = lock.clone();
148            Box::pin(async move {
149                let _guard = lock.lock().await;
150                let bytes = {
151                    let snap = BatchSnapshot {
152                        schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
153                        accounts: Some(state.read().clone()),
154                    };
155                    serde_json::to_vec(&snap).unwrap_or_default()
156                };
157                let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
158            })
159        }))
160    }
161
162    /// Reconcile jobs restored from a snapshot. After a restart the background
163    /// drivers that advance a job (status-sync / dependency-waiter) are gone and
164    /// the backing ECS task was already STOPPED by the ECS reconcile, so an
165    /// in-flight job can never reach a terminal state on its own — it would hang
166    /// at RUNNING/PENDING forever. Fail every non-terminal job with a clear
167    /// reason instead of leaving a zombie.
168    pub async fn reconcile_persisted_jobs(&self) {
169        const NON_TERMINAL: &[&str] = &["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"];
170        let now = chrono::Utc::now().timestamp_millis();
171        let mut changed = false;
172        {
173            let mut accounts = self.state.write();
174            for acct in accounts.accounts.values_mut() {
175                for job in acct.jobs.values_mut() {
176                    let Some(o) = job.as_object_mut() else {
177                        continue;
178                    };
179                    let st = o.get("status").and_then(Value::as_str).unwrap_or("");
180                    if NON_TERMINAL.contains(&st) {
181                        o.insert("status".into(), json!("FAILED"));
182                        o.insert(
183                            "statusReason".into(),
184                            json!("Job interrupted by a fakecloud restart"),
185                        );
186                        o.entry("stoppedAt".to_string())
187                            .or_insert_with(|| json!(now));
188                        changed = true;
189                    }
190                }
191            }
192        }
193        if changed {
194            self.save_snapshot().await;
195        }
196    }
197
198    /// Map the restJson1 request (POST /v1/<op>, plus the /v1/tags/{arn}
199    /// family) to its operation name.
200    fn resolve_action(req: &AwsRequest) -> Option<&'static str> {
201        let segs = &req.path_segments;
202        if segs.first().map(|s| s.as_str()) != Some("v1") {
203            return None;
204        }
205        // Tag family: /v1/tags/{resourceArn}
206        if segs.get(1).map(|s| s.as_str()) == Some("tags") {
207            return match req.method {
208                Method::GET => Some("ListTagsForResource"),
209                Method::POST => Some("TagResource"),
210                Method::DELETE => Some("UntagResource"),
211                _ => None,
212            };
213        }
214        let op = segs.get(1)?.as_str();
215        Some(match op {
216            "canceljob" => "CancelJob",
217            "createcomputeenvironment" => "CreateComputeEnvironment",
218            "createconsumableresource" => "CreateConsumableResource",
219            "createjobqueue" => "CreateJobQueue",
220            "createquotashare" => "CreateQuotaShare",
221            "createschedulingpolicy" => "CreateSchedulingPolicy",
222            "createserviceenvironment" => "CreateServiceEnvironment",
223            "deletecomputeenvironment" => "DeleteComputeEnvironment",
224            "deleteconsumableresource" => "DeleteConsumableResource",
225            "deletejobqueue" => "DeleteJobQueue",
226            "deletequotashare" => "DeleteQuotaShare",
227            "deleteschedulingpolicy" => "DeleteSchedulingPolicy",
228            "deleteserviceenvironment" => "DeleteServiceEnvironment",
229            "deregisterjobdefinition" => "DeregisterJobDefinition",
230            "describecomputeenvironments" => "DescribeComputeEnvironments",
231            "describeconsumableresource" => "DescribeConsumableResource",
232            "describejobdefinitions" => "DescribeJobDefinitions",
233            "describejobqueues" => "DescribeJobQueues",
234            "describejobs" => "DescribeJobs",
235            "describequotashare" => "DescribeQuotaShare",
236            "describeschedulingpolicies" => "DescribeSchedulingPolicies",
237            "describeserviceenvironments" => "DescribeServiceEnvironments",
238            "describeservicejob" => "DescribeServiceJob",
239            "getjobqueuesnapshot" => "GetJobQueueSnapshot",
240            "listconsumableresources" => "ListConsumableResources",
241            "listjobs" => "ListJobs",
242            "listjobsbyconsumableresource" => "ListJobsByConsumableResource",
243            "listquotashares" => "ListQuotaShares",
244            "listschedulingpolicies" => "ListSchedulingPolicies",
245            "listservicejobs" => "ListServiceJobs",
246            "registerjobdefinition" => "RegisterJobDefinition",
247            "submitjob" => "SubmitJob",
248            "submitservicejob" => "SubmitServiceJob",
249            "terminatejob" => "TerminateJob",
250            "terminateservicejob" => "TerminateServiceJob",
251            "updatecomputeenvironment" => "UpdateComputeEnvironment",
252            "updateconsumableresource" => "UpdateConsumableResource",
253            "updatejobqueue" => "UpdateJobQueue",
254            "updatequotashare" => "UpdateQuotaShare",
255            "updateschedulingpolicy" => "UpdateSchedulingPolicy",
256            "updateserviceenvironment" => "UpdateServiceEnvironment",
257            "updateservicejob" => "UpdateServiceJob",
258            _ => return None,
259        })
260    }
261
262    fn dispatch(&self, action: &str, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
263        match action {
264            "CreateComputeEnvironment" => self.create_compute_environment(req),
265            "DescribeComputeEnvironments" => self.describe_compute_environments(req),
266            "DeleteComputeEnvironment" => self.delete_compute_environment(req),
267            "CreateJobQueue" => self.create_job_queue(req),
268            "DescribeJobQueues" => self.describe_job_queues(req),
269            "DeleteJobQueue" => self.delete_job_queue(req),
270            "UpdateComputeEnvironment" => self.update_compute_environment(req),
271            "UpdateJobQueue" => self.update_job_queue(req),
272            "RegisterJobDefinition" => self.register_job_definition(req),
273            "DescribeJobDefinitions" => self.describe_job_definitions(req),
274            "DeregisterJobDefinition" => self.deregister_job_definition(req),
275            "CreateSchedulingPolicy" => self.create_scheduling_policy(req),
276            "DescribeSchedulingPolicies" => self.describe_scheduling_policies(req),
277            "ListSchedulingPolicies" => self.list_scheduling_policies(req),
278            "UpdateSchedulingPolicy" => self.update_scheduling_policy(req),
279            "DeleteSchedulingPolicy" => self.delete_scheduling_policy(req),
280            "DescribeJobs" => self.describe_jobs(req),
281            "ListJobs" => self.list_jobs(req),
282            "CancelJob" => self.cancel_job(req),
283            "TerminateJob" => self.terminate_job(req),
284            "TagResource" => self.tag_resource(req),
285            "UntagResource" => self.untag_resource(req),
286            "ListTagsForResource" => self.list_tags_for_resource(req),
287            other => Err(AwsServiceError::action_not_implemented("batch", other)),
288        }
289    }
290}
291
292fn obj(v: &Value) -> Map<String, Value> {
293    v.as_object().cloned().unwrap_or_default()
294}
295
296fn client_error(code: &str, msg: impl Into<String>) -> AwsServiceError {
297    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, code, msg.into())
298}
299
300type TagStore = std::collections::BTreeMap<String, std::collections::BTreeMap<String, String>>;
301
302/// Seed the authoritative per-ARN tag store from a resource's inline `tags`
303/// (what Create receives), so `ListTagsForResource` reflects create-time tags.
304fn seed_inline_tags(tags: &mut TagStore, arn: &str, stored: &Map<String, Value>) {
305    if let Some(inline) = stored.get("tags").and_then(Value::as_object) {
306        let entry = tags.entry(arn.to_string()).or_default();
307        for (k, v) in inline {
308            if let Some(s) = v.as_str() {
309                entry.insert(k.clone(), s.to_string());
310            }
311        }
312    }
313}
314
315/// Overlay the authoritative tag store onto a resource on read, so `Describe*`
316/// reflects `TagResource`/`UntagResource` (which write only to the store) — the
317/// two used to diverge, breaking terraform tag updates.
318fn merge_tag_overlay(resource: &Value, arn_key: &str, tags: &TagStore) -> Value {
319    let mut o = obj(resource);
320    if let Some(arn) = o.get(arn_key).and_then(Value::as_str).map(String::from) {
321        if let Some(t) = tags.get(&arn) {
322            o.insert(
323                "tags".into(),
324                Value::Object(t.iter().map(|(k, v)| (k.clone(), json!(v))).collect()),
325            );
326        }
327    }
328    Value::Object(o)
329}
330
331/// Build a Batch `JobSummary` from a stored job, carrying every AWS-present
332/// field that exists on the record (jobId/jobName/createdAt/status are always
333/// there; the rest appear once the job has progressed).
334fn job_summary(j: &Value) -> Value {
335    let mut s = serde_json::Map::new();
336    for key in [
337        "jobId",
338        "jobArn",
339        "jobName",
340        "createdAt",
341        "status",
342        "statusReason",
343        "startedAt",
344        "stoppedAt",
345        "jobDefinition",
346        "container",
347        "arrayProperties",
348        "nodeProperties",
349    ] {
350        if let Some(v) = j.get(key) {
351            s.insert(key.to_string(), v.clone());
352        }
353    }
354    Value::Object(s)
355}
356
357impl BatchService {
358    fn arn(&self, account: &str, region: &str, resource: &str) -> String {
359        Arn::new("batch", region, account, resource).to_string()
360    }
361
362    // ---- Compute environments ----
363
364    fn create_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
365        let body = req.json_body();
366        let name = body
367            .get("computeEnvironmentName")
368            .and_then(Value::as_str)
369            .ok_or_else(|| client_error("ClientException", "computeEnvironmentName is required"))?
370            .to_string();
371        let arn = self.arn(
372            &req.account_id,
373            &req.region,
374            &format!("compute-environment/{name}"),
375        );
376        let mut stored = obj(&body);
377        stored.insert("computeEnvironmentArn".into(), json!(arn));
378        stored.insert("status".into(), json!("VALID"));
379        stored.insert("statusReason".into(), json!("ComputeEnvironment Healthy"));
380        stored
381            .entry("state".to_string())
382            .or_insert_with(|| json!("ENABLED"));
383        let uuid = Uuid::new_v4().to_string();
384        // Managed compute environments are backed by a real ECS cluster; AWS
385        // returns its ARN and the provider (and ECS console) read it back.
386        stored.insert(
387            "ecsClusterArn".into(),
388            json!(format!(
389                "arn:aws:ecs:{}:{}:cluster/AWSBatch-{name}-{uuid}",
390                req.region, req.account_id
391            )),
392        );
393        stored.insert("uuid".into(), json!(uuid));
394
395        let mut accounts = self.state.write();
396        let st = accounts.get_or_create(&req.account_id);
397        if st.compute_environments.contains_key(&name) {
398            return Err(client_error(
399                "ClientException",
400                format!("Object already exists: {name}"),
401            ));
402        }
403        seed_inline_tags(&mut st.tags, &arn, &stored);
404        st.compute_environments
405            .insert(name.clone(), Value::Object(stored));
406        Ok(AwsResponse::ok_json(json!({
407            "computeEnvironmentName": name,
408            "computeEnvironmentArn": arn,
409        })))
410    }
411
412    fn describe_compute_environments(
413        &self,
414        req: &AwsRequest,
415    ) -> Result<AwsResponse, AwsServiceError> {
416        let body = req.json_body();
417        let wanted = string_set(&body, "computeEnvironments");
418        let accounts = self.state.read();
419        let items: Vec<Value> = accounts
420            .get(&req.account_id)
421            .map(|st| {
422                st.compute_environments
423                    .values()
424                    .filter(|ce| {
425                        match_named(
426                            ce,
427                            &wanted,
428                            "computeEnvironmentName",
429                            "computeEnvironmentArn",
430                        )
431                    })
432                    .map(|ce| {
433                        let mut v = merge_tag_overlay(ce, "computeEnvironmentArn", &st.tags);
434                        // AWS always reports the orchestration backend.
435                        if let Some(o) = v.as_object_mut() {
436                            o.entry("containerOrchestrationType".to_string())
437                                .or_insert_with(|| json!("ECS"));
438                        }
439                        v
440                    })
441                    .collect()
442            })
443            .unwrap_or_default();
444        Ok(AwsResponse::ok_json(
445            json!({ "computeEnvironments": items }),
446        ))
447    }
448
449    fn delete_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
450        let body = req.json_body();
451        let name = arn_or_name(&body, "computeEnvironment")?;
452        let mut accounts = self.state.write();
453        accounts
454            .get_or_create(&req.account_id)
455            .compute_environments
456            .remove(&name);
457        Ok(AwsResponse::ok_json(json!({})))
458    }
459
460    // ---- Job queues ----
461
462    fn create_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
463        let body = req.json_body();
464        let name = body
465            .get("jobQueueName")
466            .and_then(Value::as_str)
467            .ok_or_else(|| client_error("ClientException", "jobQueueName is required"))?
468            .to_string();
469        let arn = self.arn(&req.account_id, &req.region, &format!("job-queue/{name}"));
470        let mut stored = obj(&body);
471        stored.insert("jobQueueArn".into(), json!(arn));
472        stored.insert("status".into(), json!("VALID"));
473        stored.insert("statusReason".into(), json!("JobQueue Healthy"));
474        stored
475            .entry("state".to_string())
476            .or_insert_with(|| json!("ENABLED"));
477
478        let mut accounts = self.state.write();
479        let st = accounts.get_or_create(&req.account_id);
480        if st.job_queues.contains_key(&name) {
481            return Err(client_error(
482                "ClientException",
483                format!("Object already exists: {name}"),
484            ));
485        }
486        seed_inline_tags(&mut st.tags, &arn, &stored);
487        st.job_queues.insert(name.clone(), Value::Object(stored));
488        Ok(AwsResponse::ok_json(json!({
489            "jobQueueName": name,
490            "jobQueueArn": arn,
491        })))
492    }
493
494    fn describe_job_queues(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
495        let body = req.json_body();
496        let wanted = string_set(&body, "jobQueues");
497        let accounts = self.state.read();
498        let items: Vec<Value> = accounts
499            .get(&req.account_id)
500            .map(|st| {
501                st.job_queues
502                    .values()
503                    .filter(|q| match_named(q, &wanted, "jobQueueName", "jobQueueArn"))
504                    .map(|q| merge_tag_overlay(q, "jobQueueArn", &st.tags))
505                    .collect()
506            })
507            .unwrap_or_default();
508        Ok(AwsResponse::ok_json(json!({ "jobQueues": items })))
509    }
510
511    fn delete_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
512        let body = req.json_body();
513        let name = arn_or_name(&body, "jobQueue")?;
514        let mut accounts = self.state.write();
515        accounts
516            .get_or_create(&req.account_id)
517            .job_queues
518            .remove(&name);
519        Ok(AwsResponse::ok_json(json!({})))
520    }
521
522    // ---- Job definitions (revisioned) ----
523
524    fn register_job_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
525        let body = req.json_body();
526        let name = body
527            .get("jobDefinitionName")
528            .and_then(Value::as_str)
529            .ok_or_else(|| client_error("ClientException", "jobDefinitionName is required"))?
530            .to_string();
531        let mut accounts = self.state.write();
532        let st = accounts.get_or_create(&req.account_id);
533        let revision = st.job_def_revisions.entry(name.clone()).or_insert(0);
534        *revision += 1;
535        let revision = *revision;
536        let arn = self.arn(
537            &req.account_id,
538            &req.region,
539            &format!("job-definition/{name}:{revision}"),
540        );
541        let mut stored = obj(&body);
542        stored.insert("jobDefinitionArn".into(), json!(arn));
543        stored.insert("revision".into(), json!(revision));
544        stored.insert("status".into(), json!("ACTIVE"));
545        // AWS defaults the optional list members of containerProperties to empty
546        // arrays and echoes them back on describe; clients (the terraform
547        // provider) read these back and expect them present.
548        if let Some(cp) = stored
549            .get_mut("containerProperties")
550            .and_then(Value::as_object_mut)
551        {
552            for key in [
553                "environment",
554                "mountPoints",
555                "resourceRequirements",
556                "secrets",
557                "ulimits",
558                "volumes",
559            ] {
560                cp.entry(key.to_string()).or_insert_with(|| json!([]));
561            }
562        }
563        seed_inline_tags(&mut st.tags, &arn, &stored);
564        st.job_definitions
565            .insert(format!("{name}:{revision}"), Value::Object(stored));
566        Ok(AwsResponse::ok_json(json!({
567            "jobDefinitionName": name,
568            "jobDefinitionArn": arn,
569            "revision": revision,
570        })))
571    }
572
573    fn describe_job_definitions(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
574        let body = req.json_body();
575        let wanted = string_set(&body, "jobDefinitions");
576        let name_filter = body
577            .get("jobDefinitionName")
578            .and_then(Value::as_str)
579            .map(|s| s.to_string());
580        let status_filter = body
581            .get("status")
582            .and_then(Value::as_str)
583            .map(|s| s.to_string());
584        let accounts = self.state.read();
585        let items: Vec<Value> = accounts
586            .get(&req.account_id)
587            .map(|st| {
588                st.job_definitions
589                    .values()
590                    .filter(|jd| {
591                        // Match by arn (in `jobDefinitions`), by name, or all.
592                        let arn_ok = wanted.is_empty()
593                            || jd
594                                .get("jobDefinitionArn")
595                                .and_then(Value::as_str)
596                                .map(|a| wanted.contains(a))
597                                .unwrap_or(false)
598                            || jd
599                                .get("jobDefinitionName")
600                                .and_then(Value::as_str)
601                                .map(|n| wanted.contains(n))
602                                .unwrap_or(false);
603                        let name_ok = name_filter.as_deref().is_none_or(|n| {
604                            jd.get("jobDefinitionName").and_then(Value::as_str) == Some(n)
605                        });
606                        let status_ok = status_filter
607                            .as_deref()
608                            .is_none_or(|s| jd.get("status").and_then(Value::as_str) == Some(s));
609                        arn_ok && name_ok && status_ok
610                    })
611                    .map(|jd| merge_tag_overlay(jd, "jobDefinitionArn", &st.tags))
612                    .collect()
613            })
614            .unwrap_or_default();
615        Ok(AwsResponse::ok_json(json!({ "jobDefinitions": items })))
616    }
617
618    fn deregister_job_definition(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
619        let body = req.json_body();
620        let id = body
621            .get("jobDefinition")
622            .and_then(Value::as_str)
623            .ok_or_else(|| client_error("ClientException", "jobDefinition is required"))?;
624        // Accept "name:revision" or an ARN ending in name:revision.
625        let key = id.rsplit('/').next().unwrap_or(id).to_string();
626        let mut accounts = self.state.write();
627        let st = accounts.get_or_create(&req.account_id);
628        if let Some(jd) = st.job_definitions.get_mut(&key) {
629            if let Some(o) = jd.as_object_mut() {
630                o.insert("status".into(), json!("INACTIVE"));
631            }
632        }
633        Ok(AwsResponse::ok_json(json!({})))
634    }
635
636    fn update_compute_environment(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637        let body = req.json_body();
638        let name = arn_or_name(&body, "computeEnvironment")?;
639        let mut accounts = self.state.write();
640        let st = accounts.get_or_create(&req.account_id);
641        let ce = st
642            .compute_environments
643            .get_mut(&name)
644            .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
645        let arn = merge_updates(
646            ce,
647            &body,
648            &[
649                "state",
650                "desiredvCpus",
651                "computeResources",
652                "serviceRole",
653                "updatePolicy",
654            ],
655            "computeEnvironmentArn",
656        );
657        Ok(AwsResponse::ok_json(json!({
658            "computeEnvironmentName": name,
659            "computeEnvironmentArn": arn,
660        })))
661    }
662
663    fn update_job_queue(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
664        let body = req.json_body();
665        let name = arn_or_name(&body, "jobQueue")?;
666        let mut accounts = self.state.write();
667        let st = accounts.get_or_create(&req.account_id);
668        let q = st
669            .job_queues
670            .get_mut(&name)
671            .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
672        let arn = merge_updates(
673            q,
674            &body,
675            &[
676                "state",
677                "priority",
678                "computeEnvironmentOrder",
679                "schedulingPolicyArn",
680                "jobStateTimeLimitActions",
681            ],
682            "jobQueueArn",
683        );
684        Ok(AwsResponse::ok_json(json!({
685            "jobQueueName": name,
686            "jobQueueArn": arn,
687        })))
688    }
689
690    // ---- Scheduling policies ----
691
692    fn create_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
693        let body = req.json_body();
694        let name = body
695            .get("name")
696            .and_then(Value::as_str)
697            .ok_or_else(|| client_error("ClientException", "name is required"))?
698            .to_string();
699        let arn = self.arn(
700            &req.account_id,
701            &req.region,
702            &format!("scheduling-policy/{name}"),
703        );
704        let mut stored = obj(&body);
705        stored.insert("arn".into(), json!(arn));
706        let mut accounts = self.state.write();
707        let st = accounts.get_or_create(&req.account_id);
708        if st.scheduling_policies.contains_key(&name) {
709            return Err(client_error(
710                "ClientException",
711                format!("Object already exists: {name}"),
712            ));
713        }
714        seed_inline_tags(&mut st.tags, &arn, &stored);
715        st.scheduling_policies
716            .insert(name.clone(), Value::Object(stored));
717        Ok(AwsResponse::ok_json(json!({ "name": name, "arn": arn })))
718    }
719
720    fn describe_scheduling_policies(
721        &self,
722        req: &AwsRequest,
723    ) -> Result<AwsResponse, AwsServiceError> {
724        let body = req.json_body();
725        let wanted = string_set(&body, "arns");
726        let accounts = self.state.read();
727        let items: Vec<Value> = accounts
728            .get(&req.account_id)
729            .map(|st| {
730                st.scheduling_policies
731                    .values()
732                    .filter(|p| {
733                        wanted.is_empty()
734                            || p.get("arn")
735                                .and_then(Value::as_str)
736                                .map(|a| wanted.contains(a))
737                                .unwrap_or(false)
738                    })
739                    .map(|p| merge_tag_overlay(p, "arn", &st.tags))
740                    .collect()
741            })
742            .unwrap_or_default();
743        Ok(AwsResponse::ok_json(json!({ "schedulingPolicies": items })))
744    }
745
746    fn list_scheduling_policies(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
747        let accounts = self.state.read();
748        let items: Vec<Value> = accounts
749            .get(&req.account_id)
750            .map(|st| {
751                st.scheduling_policies
752                    .values()
753                    .filter_map(|p| p.get("arn").map(|a| json!({ "arn": a })))
754                    .collect()
755            })
756            .unwrap_or_default();
757        Ok(AwsResponse::ok_json(json!({ "schedulingPolicies": items })))
758    }
759
760    fn update_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
761        let body = req.json_body();
762        let name = arn_or_name(&body, "arn")?;
763        let mut accounts = self.state.write();
764        let st = accounts.get_or_create(&req.account_id);
765        let p = st
766            .scheduling_policies
767            .get_mut(&name)
768            .ok_or_else(|| client_error("ClientException", format!("Object not found: {name}")))?;
769        merge_updates(p, &body, &["fairsharePolicy"], "arn");
770        Ok(AwsResponse::ok_json(json!({})))
771    }
772
773    fn delete_scheduling_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
774        let body = req.json_body();
775        let name = arn_or_name(&body, "arn")?;
776        let mut accounts = self.state.write();
777        accounts
778            .get_or_create(&req.account_id)
779            .scheduling_policies
780            .remove(&name);
781        Ok(AwsResponse::ok_json(json!({})))
782    }
783
784    // ---- Jobs (control plane; real container execution lands in batch 2) ----
785
786    async fn submit_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
787        let body = req.json_body();
788        let job_name = body
789            .get("jobName")
790            .and_then(Value::as_str)
791            .ok_or_else(|| client_error("ClientException", "jobName is required"))?
792            .to_string();
793        let job_queue = body
794            .get("jobQueue")
795            .and_then(Value::as_str)
796            .ok_or_else(|| client_error("ClientException", "jobQueue is required"))?
797            .to_string();
798        // AWS rejects SubmitJob against a queue that doesn't exist.
799        {
800            let queue_name = job_queue.rsplit('/').next().unwrap_or(&job_queue);
801            let accounts = self.state.read();
802            let exists = accounts.get(&req.account_id).is_some_and(|st| {
803                st.job_queues.contains_key(queue_name)
804                    || st.job_queues.values().any(|q| {
805                        q.get("jobQueueArn").and_then(Value::as_str) == Some(job_queue.as_str())
806                    })
807            });
808            if !exists {
809                return Err(client_error(
810                    "ClientException",
811                    format!("Job queue {job_queue} does not exist"),
812                ));
813            }
814        }
815        let job_definition = body
816            .get("jobDefinition")
817            .and_then(Value::as_str)
818            .ok_or_else(|| client_error("ClientException", "jobDefinition is required"))?
819            .to_string();
820        let job_id = Uuid::new_v4().to_string();
821        let arn = self.arn(&req.account_id, &req.region, &format!("job/{job_id}"));
822        let now = chrono::Utc::now().timestamp_millis();
823        // AWS Batch caps an array job at 2..=10000 children; reject out-of-range
824        // sizes rather than synchronously spawning an unbounded number of
825        // container launches (a single-request resource-exhaustion vector).
826        let array_size = match body
827            .pointer("/arrayProperties/size")
828            .and_then(Value::as_i64)
829        {
830            Some(n) if (2..=10_000).contains(&n) => Some(n),
831            Some(n) => {
832                return Err(client_error(
833                    "ClientException",
834                    format!("Array job size must be between 2 and 10000, but was {n}"),
835                ));
836            }
837            None => None,
838        };
839        let depends_on: Vec<String> = body
840            .get("dependsOn")
841            .and_then(Value::as_array)
842            .map(|a| {
843                a.iter()
844                    .filter_map(|d| d.get("jobId").and_then(Value::as_str).map(String::from))
845                    .collect()
846            })
847            .unwrap_or_default();
848
849        let mut job = obj(&body);
850        job.insert("jobId".into(), json!(job_id));
851        job.insert("jobArn".into(), json!(arn));
852        job.insert("jobName".into(), json!(job_name));
853        job.insert("jobQueue".into(), json!(job_queue));
854        job.insert("jobDefinition".into(), json!(job_definition));
855        job.insert("createdAt".into(), json!(now));
856
857        // Resolve the job definition's container properties (+ this submit's
858        // containerOverrides) so the job runs the right image/command.
859        let container = self.resolve_container(&req.account_id, &job_definition, &body);
860
861        if let Some(size) = array_size {
862            // Array job: the parent is a tracking record; spawn `size` child
863            // jobs `<parent>:<index>`, each with AWS_BATCH_JOB_ARRAY_INDEX set,
864            // and run them. The parent's status + statusSummary are computed
865            // from the children at DescribeJobs time.
866            job.insert("status".into(), json!("PENDING"));
867            job.insert("arrayProperties".into(), json!({ "size": size }));
868            {
869                let mut accounts = self.state.write();
870                accounts
871                    .get_or_create(&req.account_id)
872                    .jobs
873                    .insert(job_id.clone(), Value::Object(job));
874            }
875            for index in 0..size {
876                let child_id = format!("{job_id}:{index}");
877                let child_arn = self.arn(&req.account_id, &req.region, &format!("job/{child_id}"));
878                let mut child = serde_json::Map::new();
879                child.insert("jobId".into(), json!(child_id));
880                child.insert("jobArn".into(), json!(child_arn));
881                child.insert("jobName".into(), json!(job_name));
882                child.insert("jobQueue".into(), json!(job_queue));
883                child.insert("jobDefinition".into(), json!(job_definition));
884                child.insert("status".into(), json!("SUBMITTED"));
885                child.insert("createdAt".into(), json!(now));
886                child.insert(
887                    "arrayProperties".into(),
888                    json!({ "index": index, "statusSummary": {} }),
889                );
890                {
891                    let mut accounts = self.state.write();
892                    accounts
893                        .get_or_create(&req.account_id)
894                        .jobs
895                        .insert(child_id.clone(), Value::Object(child));
896                }
897                let child_container = container.clone().map(|c| with_array_index_env(c, index));
898                self.launch_job(req, &child_id, &job_name, child_container, now)
899                    .await;
900            }
901        } else if !depends_on.is_empty() {
902            // Dependency gating: park at PENDING and spawn a waiter that
903            // launches once every dependency has SUCCEEDED (or fails this job if
904            // any dependency fails). Never blocks SubmitJob.
905            job.insert("status".into(), json!("PENDING"));
906            {
907                let mut accounts = self.state.write();
908                accounts
909                    .get_or_create(&req.account_id)
910                    .jobs
911                    .insert(job_id.clone(), Value::Object(job));
912            }
913            spawn_dependency_waiter(
914                self.launch_ctx(),
915                req.account_id.clone(),
916                req.region.clone(),
917                req.request_id.clone(),
918                job_id.clone(),
919                job_name.clone(),
920                container,
921                depends_on,
922                now,
923            );
924        } else {
925            job.insert("status".into(), json!("SUBMITTED"));
926            {
927                let mut accounts = self.state.write();
928                accounts
929                    .get_or_create(&req.account_id)
930                    .jobs
931                    .insert(job_id.clone(), Value::Object(job));
932            }
933            self.launch_job(req, &job_id, &job_name, container, now)
934                .await;
935        }
936
937        Ok(AwsResponse::ok_json(json!({
938            "jobArn": arn,
939            "jobName": job_name,
940            "jobId": job_id,
941        })))
942    }
943
944    /// Launch a REAL container-backed task on the ECS engine when wired and the
945    /// definition carries an image. With no ECS backend (unit tests / no
946    /// container runtime) the job stays SUBMITTED honestly — never an
947    /// auto-success, which is exactly the rival anti-pattern Batch beats.
948    /// Bundle the clones a spawned task needs so the launch path can run with
949    /// no `&self` (used by SubmitJob inline and by the dependency waiter).
950    fn launch_ctx(&self) -> LaunchCtx {
951        LaunchCtx {
952            batch_state: self.state.clone(),
953            ecs_state: self.ecs_state.clone(),
954            ecs_runtime: self.ecs_runtime.clone(),
955            snapshot_store: self.snapshot_store.clone(),
956            snapshot_lock: self.snapshot_lock.clone(),
957        }
958    }
959
960    async fn launch_job(
961        &self,
962        req: &AwsRequest,
963        job_id: &str,
964        job_name: &str,
965        container: Option<Value>,
966        now: i64,
967    ) {
968        launch(
969            &self.launch_ctx(),
970            &req.account_id,
971            &req.region,
972            &req.request_id,
973            job_id,
974            job_name,
975            container,
976            now,
977        )
978        .await;
979    }
980
981    /// Resolve `containerProperties` from the stored job definition (by
982    /// `name:revision`, bare name -> latest, or ARN), then overlay this
983    /// submit's `containerOverrides` (command / environment / resourceRequirements).
984    fn resolve_container(
985        &self,
986        account_id: &str,
987        job_definition: &str,
988        submit_body: &Value,
989    ) -> Option<Value> {
990        let key = job_definition.rsplit('/').next().unwrap_or(job_definition);
991        let accounts = self.state.read();
992        let st = accounts.get(account_id)?;
993        // Exact "name:revision", else the highest-revision entry for the name.
994        let jd = if key.contains(':') {
995            st.job_definitions.get(key).cloned()
996        } else {
997            st.job_definitions
998                .iter()
999                .filter(|(k, _)| k.rsplit_once(':').map(|(n, _)| n) == Some(key))
1000                .max_by_key(|(k, _)| {
1001                    k.rsplit_once(':')
1002                        .and_then(|(_, r)| r.parse::<i64>().ok())
1003                        .unwrap_or(0)
1004                })
1005                .map(|(_, v)| v.clone())
1006        }?;
1007        let mut container = jd.get("containerProperties")?.as_object()?.clone();
1008
1009        if let Some(ov) = submit_body
1010            .get("containerOverrides")
1011            .and_then(Value::as_object)
1012        {
1013            for f in ["command", "environment", "resourceRequirements"] {
1014                if let Some(v) = ov.get(f) {
1015                    container.insert(f.to_string(), v.clone());
1016                }
1017            }
1018        }
1019        Some(Value::Object(container))
1020    }
1021
1022    fn describe_jobs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1023        let body = req.json_body();
1024        let wanted = string_set(&body, "jobs");
1025        let accounts = self.state.read();
1026        let items: Vec<Value> = accounts
1027            .get(&req.account_id)
1028            .map(|st| {
1029                st.jobs
1030                    .values()
1031                    .filter(|j| {
1032                        wanted.is_empty()
1033                            || j.get("jobId")
1034                                .and_then(Value::as_str)
1035                                .map(|id| wanted.contains(id))
1036                                .unwrap_or(false)
1037                    })
1038                    .map(|j| {
1039                        // An array parent (arrayProperties.size set, no own ECS
1040                        // task) reflects its children's aggregate status +
1041                        // statusSummary computed live.
1042                        let id = j.get("jobId").and_then(Value::as_str).unwrap_or("");
1043                        let is_parent = j.pointer("/arrayProperties/size").is_some();
1044                        if !is_parent {
1045                            return j.clone();
1046                        }
1047                        let prefix = format!("{id}:");
1048                        let children: Vec<&Value> = st
1049                            .jobs
1050                            .iter()
1051                            .filter(|(k, _)| k.starts_with(&prefix))
1052                            .map(|(_, v)| v)
1053                            .collect();
1054                        let (summary, status) = array_status_summary(&children);
1055                        let mut out = j.clone();
1056                        if let Some(o) = out.as_object_mut() {
1057                            o.insert("status".into(), json!(status));
1058                            if let Some(ap) =
1059                                o.get_mut("arrayProperties").and_then(|v| v.as_object_mut())
1060                            {
1061                                ap.insert("statusSummary".into(), summary);
1062                            }
1063                        }
1064                        out
1065                    })
1066                    .collect()
1067            })
1068            .unwrap_or_default();
1069        Ok(AwsResponse::ok_json(json!({ "jobs": items })))
1070    }
1071
1072    fn list_jobs(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1073        let body = req.json_body();
1074        let queue = body.get("jobQueue").and_then(Value::as_str);
1075        let array_job_id = body.get("arrayJobId").and_then(Value::as_str);
1076        let multi_node_job_id = body.get("multiNodeJobId").and_then(Value::as_str);
1077        // AWS requires exactly one of jobQueue / arrayJobId / multiNodeJobId.
1078        let selectors = [queue, array_job_id, multi_node_job_id]
1079            .iter()
1080            .filter(|s| s.is_some())
1081            .count();
1082        if selectors != 1 {
1083            return Err(client_error(
1084                "ClientException",
1085                "The ListJobs request must specify exactly one of jobQueue, arrayJobId, or multiNodeJobId",
1086            ));
1087        }
1088        // "If you don't specify a status, only RUNNING jobs are returned."
1089        let status = body
1090            .get("jobStatus")
1091            .and_then(Value::as_str)
1092            .unwrap_or("RUNNING")
1093            .to_string();
1094        let max_results = body
1095            .get("maxResults")
1096            .and_then(Value::as_i64)
1097            .filter(|n| *n > 0)
1098            .map(|n| n.min(100) as usize)
1099            .unwrap_or(100);
1100        let start: usize = body
1101            .get("nextToken")
1102            .and_then(Value::as_str)
1103            .and_then(|t| t.parse().ok())
1104            .unwrap_or(0);
1105
1106        let accounts = self.state.read();
1107        let mut matched: Vec<&Value> = accounts
1108            .get(&req.account_id)
1109            .map(|st| {
1110                st.jobs
1111                    .values()
1112                    .filter(|j| {
1113                        let selector_ok = if let Some(q) = queue {
1114                            j.get("jobQueue").and_then(Value::as_str) == Some(q)
1115                        } else if let Some(a) = array_job_id {
1116                            j.get("jobId")
1117                                .and_then(Value::as_str)
1118                                .is_some_and(|id| id.starts_with(&format!("{a}:")))
1119                        } else if let Some(m) = multi_node_job_id {
1120                            j.get("jobId")
1121                                .and_then(Value::as_str)
1122                                .is_some_and(|id| id.starts_with(&format!("{m}#")))
1123                        } else {
1124                            false
1125                        };
1126                        selector_ok
1127                            && j.get("status").and_then(Value::as_str) == Some(status.as_str())
1128                    })
1129                    .collect()
1130            })
1131            .unwrap_or_default();
1132        // Most-recent-first, stable by jobId for ties.
1133        matched.sort_by(|a, b| {
1134            let ka = a.get("createdAt").and_then(Value::as_i64).unwrap_or(0);
1135            let kb = b.get("createdAt").and_then(Value::as_i64).unwrap_or(0);
1136            kb.cmp(&ka).then_with(|| {
1137                a.get("jobId")
1138                    .and_then(Value::as_str)
1139                    .cmp(&b.get("jobId").and_then(Value::as_str))
1140            })
1141        });
1142        let total = matched.len();
1143        let items: Vec<Value> = matched
1144            .into_iter()
1145            .skip(start)
1146            .take(max_results)
1147            .map(job_summary)
1148            .collect();
1149        let mut resp = serde_json::Map::new();
1150        resp.insert("jobSummaryList".into(), Value::Array(items));
1151        let next = start + max_results;
1152        if next < total {
1153            resp.insert("nextToken".into(), json!(next.to_string()));
1154        }
1155        Ok(AwsResponse::ok_json(Value::Object(resp)))
1156    }
1157
1158    fn cancel_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1159        self.stop_job(req, &["SUBMITTED", "PENDING", "RUNNABLE"], "CancelJob")
1160    }
1161
1162    fn terminate_job(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1163        self.stop_job(
1164            req,
1165            &["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"],
1166            "TerminateJob",
1167        )
1168    }
1169
1170    fn stop_job(
1171        &self,
1172        req: &AwsRequest,
1173        cancelable: &[&str],
1174        op: &str,
1175    ) -> Result<AwsResponse, AwsServiceError> {
1176        let body = req.json_body();
1177        let job_id = body
1178            .get("jobId")
1179            .and_then(Value::as_str)
1180            .ok_or_else(|| client_error("ClientException", "jobId is required"))?
1181            .to_string();
1182        let reason = body
1183            .get("reason")
1184            .and_then(Value::as_str)
1185            .unwrap_or(op)
1186            .to_string();
1187        let mut accounts = self.state.write();
1188        if let Some(job) = accounts
1189            .get_or_create(&req.account_id)
1190            .jobs
1191            .get_mut(&job_id)
1192        {
1193            if let Some(o) = job.as_object_mut() {
1194                let cur = o.get("status").and_then(Value::as_str).unwrap_or("");
1195                if cancelable.contains(&cur) {
1196                    o.insert("status".into(), json!("FAILED"));
1197                    o.insert("statusReason".into(), json!(reason));
1198                }
1199            }
1200        }
1201        Ok(AwsResponse::ok_json(json!({})))
1202    }
1203
1204    // ---- Tags ----
1205
1206    fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1207        let arn = req
1208            .path_segments
1209            .get(2)
1210            .map(|s| percent_decode(s))
1211            .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1212        let body = req.json_body();
1213        let tags = body
1214            .get("tags")
1215            .and_then(Value::as_object)
1216            .cloned()
1217            .unwrap_or_default();
1218        let mut accounts = self.state.write();
1219        let entry = accounts
1220            .get_or_create(&req.account_id)
1221            .tags
1222            .entry(arn)
1223            .or_default();
1224        for (k, v) in tags {
1225            if let Some(s) = v.as_str() {
1226                entry.insert(k, s.to_string());
1227            }
1228        }
1229        Ok(AwsResponse::ok_json(json!({})))
1230    }
1231
1232    fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1233        let arn = req
1234            .path_segments
1235            .get(2)
1236            .map(|s| percent_decode(s))
1237            .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1238        let keys: Vec<String> = req
1239            .query_params
1240            .iter()
1241            .filter(|(k, _)| k.as_str() == "tagKeys" || k.starts_with("tagKeys"))
1242            .map(|(_, v)| v.clone())
1243            .collect();
1244        let mut accounts = self.state.write();
1245        if let Some(entry) = accounts.get_or_create(&req.account_id).tags.get_mut(&arn) {
1246            for k in keys {
1247                entry.remove(&k);
1248            }
1249        }
1250        Ok(AwsResponse::ok_json(json!({})))
1251    }
1252
1253    fn list_tags_for_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1254        let arn = req
1255            .path_segments
1256            .get(2)
1257            .map(|s| percent_decode(s))
1258            .ok_or_else(|| client_error("ClientException", "resourceArn is required"))?;
1259        let accounts = self.state.read();
1260        let tags = accounts
1261            .get(&req.account_id)
1262            .and_then(|st| st.tags.get(&arn))
1263            .map(|m| {
1264                m.iter()
1265                    .map(|(k, v)| (k.clone(), json!(v)))
1266                    .collect::<Map<String, Value>>()
1267            })
1268            .unwrap_or_default();
1269        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
1270    }
1271}
1272
1273/// Apply the named mutable fields from `body` onto the stored resource and
1274/// return its ARN (read from `arn_key`).
1275fn merge_updates(stored: &mut Value, body: &Value, fields: &[&str], arn_key: &str) -> String {
1276    if let Some(o) = stored.as_object_mut() {
1277        for f in fields {
1278            if let Some(v) = body.get(*f) {
1279                o.insert((*f).to_string(), v.clone());
1280            }
1281        }
1282        o.get(arn_key)
1283            .and_then(Value::as_str)
1284            .unwrap_or_default()
1285            .to_string()
1286    } else {
1287        String::new()
1288    }
1289}
1290
1291/// Everything the launch path needs without a `&self`, so a spawned task (the
1292/// dependency waiter) can launch a job exactly like the inline SubmitJob path.
1293#[derive(Clone)]
1294struct LaunchCtx {
1295    batch_state: SharedBatchState,
1296    ecs_state: Option<fakecloud_ecs::SharedEcsState>,
1297    ecs_runtime: Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
1298    snapshot_store: Option<Arc<dyn SnapshotStore>>,
1299    snapshot_lock: Arc<AsyncMutex<()>>,
1300}
1301
1302/// Launch a REAL container-backed task on the ECS engine when the backend is
1303/// wired and the definition carries an image; otherwise the job stays at its
1304/// current status honestly (never an auto-success). Updates the Batch job to
1305/// STARTING + spawns the status sync, or FAILED on a launch error.
1306#[allow(clippy::too_many_arguments)]
1307async fn launch(
1308    ctx: &LaunchCtx,
1309    account: &str,
1310    region: &str,
1311    request_id: &str,
1312    job_id: &str,
1313    job_name: &str,
1314    container: Option<Value>,
1315    now: i64,
1316) {
1317    let (Some(ecs_state), Some(container)) = (ctx.ecs_state.clone(), container) else {
1318        return;
1319    };
1320    if container.get("image").and_then(Value::as_str).is_none() {
1321        return;
1322    }
1323    let src = bare_request(account, region, request_id);
1324    match launch_ecs_task(
1325        &ecs_state,
1326        &ctx.ecs_runtime,
1327        &src,
1328        job_id,
1329        job_name,
1330        &container,
1331    )
1332    .await
1333    {
1334        Ok((cluster, task_arn)) => {
1335            {
1336                let mut accounts = ctx.batch_state.write();
1337                if let Some(j) = accounts
1338                    .get_or_create(account)
1339                    .jobs
1340                    .get_mut(job_id)
1341                    .and_then(|j| j.as_object_mut())
1342                {
1343                    j.insert("status".into(), json!("STARTING"));
1344                    j.insert("ecsCluster".into(), json!(cluster));
1345                    j.insert("ecsTaskArn".into(), json!(task_arn));
1346                    j.insert("startedAt".into(), json!(now));
1347                }
1348            }
1349            spawn_status_sync(
1350                ctx,
1351                ecs_state,
1352                account.to_string(),
1353                region.to_string(),
1354                request_id.to_string(),
1355                job_id.to_string(),
1356                job_name.to_string(),
1357                cluster,
1358                task_arn,
1359                container,
1360            );
1361        }
1362        Err(err) => {
1363            let mut accounts = ctx.batch_state.write();
1364            if let Some(j) = accounts
1365                .get_or_create(account)
1366                .jobs
1367                .get_mut(job_id)
1368                .and_then(|j| j.as_object_mut())
1369            {
1370                j.insert("status".into(), json!("FAILED"));
1371                j.insert("statusReason".into(), json!(err.message().to_string()));
1372            }
1373        }
1374    }
1375}
1376
1377/// Background waiter for a job with `dependsOn`: poll the dependency job
1378/// statuses; launch this job once they have all SUCCEEDED, or fail it if any
1379/// dependency reaches FAILED. Bounded so a never-finishing dependency can't
1380/// wait forever.
1381#[allow(clippy::too_many_arguments)]
1382fn spawn_dependency_waiter(
1383    ctx: LaunchCtx,
1384    account: String,
1385    region: String,
1386    request_id: String,
1387    job_id: String,
1388    job_name: String,
1389    container: Option<Value>,
1390    depends_on: Vec<String>,
1391    now: i64,
1392) {
1393    tokio::spawn(async move {
1394        for _ in 0..1800u32 {
1395            tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1396            let statuses: Vec<String> = {
1397                let accounts = ctx.batch_state.read();
1398                let jobs = accounts.get(&account).map(|s| &s.jobs);
1399                depends_on
1400                    .iter()
1401                    .map(|d| {
1402                        jobs.and_then(|m| m.get(d))
1403                            .and_then(|j| j.get("status").and_then(Value::as_str))
1404                            .unwrap_or("")
1405                            .to_string()
1406                    })
1407                    .collect()
1408            };
1409            if statuses.iter().any(|s| s == "FAILED") {
1410                {
1411                    let mut accounts = ctx.batch_state.write();
1412                    if let Some(j) = accounts
1413                        .get_or_create(&account)
1414                        .jobs
1415                        .get_mut(&job_id)
1416                        .and_then(|j| j.as_object_mut())
1417                    {
1418                        j.insert("status".into(), json!("FAILED"));
1419                        j.insert("statusReason".into(), json!("Dependent job failed"));
1420                    }
1421                }
1422                save_snapshot_now(&ctx.batch_state, &ctx.snapshot_store, &ctx.snapshot_lock).await;
1423                return;
1424            }
1425            if statuses.iter().all(|s| s == "SUCCEEDED") {
1426                launch(
1427                    &ctx,
1428                    &account,
1429                    &region,
1430                    &request_id,
1431                    &job_id,
1432                    &job_name,
1433                    container,
1434                    now,
1435                )
1436                .await;
1437                return;
1438            }
1439        }
1440    });
1441}
1442
1443/// Ensure the batch ECS cluster exists, register a task definition from the
1444/// job's container properties, and RunTask it. Returns (cluster, taskArn).
1445/// Cross-service calls go through ECS's public `handle()` — reusing all of
1446/// ECS's real container / portability / k8s handling, not re-deriving it.
1447async fn launch_ecs_task(
1448    ecs_state: &fakecloud_ecs::SharedEcsState,
1449    ecs_runtime: &Option<Arc<fakecloud_ecs::runtime::EcsRuntime>>,
1450    src: &AwsRequest,
1451    job_id: &str,
1452    job_name: &str,
1453    container: &Value,
1454) -> Result<(String, String), AwsServiceError> {
1455    let mut ecs = fakecloud_ecs::EcsService::new(ecs_state.clone());
1456    if let Some(rt) = ecs_runtime.clone() {
1457        ecs = ecs.with_runtime(rt);
1458    }
1459    let cluster = "fakecloud-batch".to_string();
1460    let _ = ecs
1461        .handle(ecs_request(
1462            "CreateCluster",
1463            json!({ "clusterName": cluster }),
1464            src,
1465        ))
1466        .await;
1467
1468    let image = container.get("image").and_then(Value::as_str).unwrap_or("");
1469    let (vcpus, memory) = container_resources(container);
1470    let mut cdef = serde_json::Map::new();
1471    cdef.insert("name".into(), json!("default"));
1472    cdef.insert("image".into(), json!(image));
1473    cdef.insert("essential".into(), json!(true));
1474    cdef.insert("cpu".into(), json!((vcpus * 1024.0).round() as i64));
1475    cdef.insert("memory".into(), json!(memory));
1476    if let Some(cmd) = container.get("command").filter(|v| v.is_array()) {
1477        cdef.insert("command".into(), cmd.clone());
1478    }
1479    if let Some(env) = container.get("environment").filter(|v| v.is_array()) {
1480        cdef.insert("environment".into(), env.clone());
1481    }
1482    let family = format!("batch-{job_name}");
1483    let reg = ecs
1484        .handle(ecs_request(
1485            "RegisterTaskDefinition",
1486            json!({
1487                "family": family,
1488                "containerDefinitions": [Value::Object(cdef)],
1489                "networkMode": "bridge",
1490                "requiresCompatibilities": ["EC2"],
1491            }),
1492            src,
1493        ))
1494        .await?;
1495    let reg_body: Value = parse_body(&reg);
1496    let task_def_arn = reg_body
1497        .pointer("/taskDefinition/taskDefinitionArn")
1498        .and_then(Value::as_str)
1499        .map(String::from)
1500        .unwrap_or(family);
1501
1502    let run = ecs
1503        .handle(ecs_request(
1504            "RunTask",
1505            json!({
1506                "cluster": cluster,
1507                "taskDefinition": task_def_arn,
1508                "count": 1,
1509                "launchType": "EC2",
1510                "startedBy": format!("batch:{job_id}"),
1511            }),
1512            src,
1513        ))
1514        .await?;
1515    let run_body: Value = parse_body(&run);
1516    let task_arn = run_body
1517        .pointer("/tasks/0/taskArn")
1518        .and_then(Value::as_str)
1519        .map(String::from)
1520        .ok_or_else(|| client_error("ServerException", "RunTask returned no task"))?;
1521    Ok((cluster, task_arn))
1522}
1523
1524/// Poll the ECS task in the background and map its real lifecycle + container
1525/// exit code onto the Batch job status. NO auto-success: the job only reaches
1526/// SUCCEEDED when the real container exits 0. Honors the job's `retryStrategy`
1527/// (re-launch a failed attempt up to `attempts` times) and `timeout`
1528/// (`attemptDurationSeconds` caps each attempt).
1529#[allow(clippy::too_many_arguments)]
1530fn spawn_status_sync(
1531    ctx: &LaunchCtx,
1532    ecs_state: fakecloud_ecs::SharedEcsState,
1533    account_id: String,
1534    region: String,
1535    request_id: String,
1536    job_id: String,
1537    job_name: String,
1538    cluster: String,
1539    task_arn: String,
1540    container: Value,
1541) {
1542    let batch_state = ctx.batch_state.clone();
1543    let snapshot_store = ctx.snapshot_store.clone();
1544    let snapshot_lock = ctx.snapshot_lock.clone();
1545    let ecs_runtime = ctx.ecs_runtime.clone();
1546    tokio::spawn(async move {
1547        let ecs = fakecloud_ecs::EcsService::new(ecs_state.clone());
1548        let src = bare_request(&account_id, &region, &request_id);
1549        // retryStrategy.attempts (1-10) and timeout.attemptDurationSeconds.
1550        let (max_attempts, timeout_secs) = {
1551            let accounts = batch_state.read();
1552            let j = accounts.get(&account_id).and_then(|s| s.jobs.get(&job_id));
1553            let ma = j
1554                .and_then(|j| j.pointer("/retryStrategy/attempts"))
1555                .and_then(Value::as_i64)
1556                .unwrap_or(1)
1557                .clamp(1, 10);
1558            let to = j
1559                .and_then(|j| j.pointer("/timeout/attemptDurationSeconds"))
1560                .and_then(Value::as_i64)
1561                .filter(|t| *t > 0);
1562            (ma, to)
1563        };
1564        let max_polls = timeout_secs.unwrap_or(900).min(900) as u32;
1565        let mut task = task_arn;
1566        let mut attempt: i64 = 1;
1567        loop {
1568            // Poll this attempt's task until it stops or the timeout elapses.
1569            let mut outcome: Option<(Option<i64>, Option<String>)> = None; // (exitCode, reason)
1570            let mut succeeded = false;
1571            for _ in 0..max_polls {
1572                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1573                let resp = match ecs
1574                    .handle(ecs_request(
1575                        "DescribeTasks",
1576                        json!({ "cluster": cluster, "tasks": [task] }),
1577                        &src,
1578                    ))
1579                    .await
1580                {
1581                    Ok(r) => r,
1582                    Err(_) => continue,
1583                };
1584                let body = parse_body(&resp);
1585                let Some(t) = body.pointer("/tasks/0") else {
1586                    continue;
1587                };
1588                match t.get("lastStatus").and_then(Value::as_str).unwrap_or("") {
1589                    "RUNNING" => {
1590                        let mut accounts = batch_state.write();
1591                        if let Some(j) = accounts
1592                            .get_or_create(&account_id)
1593                            .jobs
1594                            .get_mut(&job_id)
1595                            .and_then(|j| j.as_object_mut())
1596                        {
1597                            if j.get("status").and_then(Value::as_str) != Some("RUNNING") {
1598                                j.insert("status".into(), json!("RUNNING"));
1599                            }
1600                        }
1601                    }
1602                    "STOPPED" => {
1603                        let code = t.pointer("/containers/0/exitCode").and_then(Value::as_i64);
1604                        let reason = t
1605                            .get("stoppedReason")
1606                            .and_then(Value::as_str)
1607                            .map(String::from);
1608                        if code == Some(0) {
1609                            succeeded = true;
1610                        } else {
1611                            outcome =
1612                                Some((code, reason.or(Some("Essential container exited".into()))));
1613                        }
1614                        break;
1615                    }
1616                    _ => continue,
1617                }
1618            }
1619
1620            if succeeded {
1621                set_job_terminal(
1622                    &batch_state,
1623                    &account_id,
1624                    &job_id,
1625                    "SUCCEEDED",
1626                    Some(0),
1627                    None,
1628                );
1629                save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1630                break;
1631            }
1632
1633            // Failure (or timeout when `outcome` is None).
1634            let (exit_code, reason) =
1635                outcome.unwrap_or((None, Some("Job attempt duration exceeded timeout".into())));
1636            if attempt < max_attempts {
1637                // Record the failed attempt and re-launch a fresh task.
1638                {
1639                    let mut accounts = batch_state.write();
1640                    if let Some(j) = accounts
1641                        .get_or_create(&account_id)
1642                        .jobs
1643                        .get_mut(&job_id)
1644                        .and_then(|j| j.as_object_mut())
1645                    {
1646                        let attempts = j.entry("attempts".to_string()).or_insert_with(|| json!([]));
1647                        if let Some(a) = attempts.as_array_mut() {
1648                            a.push(json!({
1649                                "exitCode": exit_code,
1650                                "statusReason": reason,
1651                            }));
1652                        }
1653                        j.insert("status".into(), json!("RUNNABLE"));
1654                    }
1655                }
1656                save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1657                match launch_ecs_task(
1658                    &ecs_state,
1659                    &ecs_runtime,
1660                    &src,
1661                    &job_id,
1662                    &job_name,
1663                    &container,
1664                )
1665                .await
1666                {
1667                    Ok((_, new_task)) => {
1668                        task = new_task;
1669                        attempt += 1;
1670                        let mut accounts = batch_state.write();
1671                        if let Some(j) = accounts
1672                            .get_or_create(&account_id)
1673                            .jobs
1674                            .get_mut(&job_id)
1675                            .and_then(|j| j.as_object_mut())
1676                        {
1677                            j.insert("status".into(), json!("STARTING"));
1678                            j.insert("ecsTaskArn".into(), json!(task));
1679                        }
1680                        continue;
1681                    }
1682                    Err(_) => {
1683                        set_job_terminal(
1684                            &batch_state,
1685                            &account_id,
1686                            &job_id,
1687                            "FAILED",
1688                            exit_code,
1689                            reason,
1690                        );
1691                        save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1692                        break;
1693                    }
1694                }
1695            } else {
1696                set_job_terminal(
1697                    &batch_state,
1698                    &account_id,
1699                    &job_id,
1700                    "FAILED",
1701                    exit_code,
1702                    reason,
1703                );
1704                save_snapshot_now(&batch_state, &snapshot_store, &snapshot_lock).await;
1705                break;
1706            }
1707        }
1708    });
1709}
1710
1711/// Write a job's terminal status + `container.exitCode`/reason + `stoppedAt`.
1712fn set_job_terminal(
1713    batch_state: &SharedBatchState,
1714    account_id: &str,
1715    job_id: &str,
1716    status: &str,
1717    exit_code: Option<i64>,
1718    reason: Option<String>,
1719) {
1720    let mut accounts = batch_state.write();
1721    if let Some(j) = accounts
1722        .get_or_create(account_id)
1723        .jobs
1724        .get_mut(job_id)
1725        .and_then(|j| j.as_object_mut())
1726    {
1727        j.insert("status".into(), json!(status));
1728        if let Some(c) = exit_code {
1729            let container = j
1730                .entry("container".to_string())
1731                .or_insert_with(|| json!({}));
1732            if let Some(o) = container.as_object_mut() {
1733                o.insert("exitCode".into(), json!(c));
1734            }
1735        }
1736        if let Some(r) = reason {
1737            j.insert("statusReason".into(), json!(r.clone()));
1738            if let Some(o) = j.get_mut("container").and_then(|v| v.as_object_mut()) {
1739                o.insert("reason".into(), json!(r));
1740            }
1741        }
1742        j.insert(
1743            "stoppedAt".into(),
1744            json!(chrono::Utc::now().timestamp_millis()),
1745        );
1746    }
1747}
1748
1749/// Build an ECS `AwsRequest` (awsJson1.1) carrying the originating
1750/// account/region so the launched task lands in the caller's account.
1751fn ecs_request(action: &str, body: Value, src: &AwsRequest) -> AwsRequest {
1752    AwsRequest {
1753        service: "ecs".to_string(),
1754        action: action.to_string(),
1755        region: src.region.clone(),
1756        account_id: src.account_id.clone(),
1757        request_id: src.request_id.clone(),
1758        headers: http::HeaderMap::new(),
1759        query_params: std::collections::HashMap::new(),
1760        body: bytes::Bytes::from(serde_json::to_vec(&body).unwrap_or_default()),
1761        body_stream: parking_lot::Mutex::new(None),
1762        path_segments: Vec::new(),
1763        raw_path: "/".to_string(),
1764        raw_query: String::new(),
1765        method: Method::POST,
1766        is_query_protocol: false,
1767        access_key_id: None,
1768        principal: None,
1769    }
1770}
1771
1772/// A minimal `AwsRequest` for the background poller (carries only the
1773/// originating account/region/request-id).
1774fn bare_request(account_id: &str, region: &str, request_id: &str) -> AwsRequest {
1775    AwsRequest {
1776        service: "batch".to_string(),
1777        action: String::new(),
1778        region: region.to_string(),
1779        account_id: account_id.to_string(),
1780        request_id: request_id.to_string(),
1781        headers: http::HeaderMap::new(),
1782        query_params: std::collections::HashMap::new(),
1783        body: bytes::Bytes::new(),
1784        body_stream: parking_lot::Mutex::new(None),
1785        path_segments: Vec::new(),
1786        raw_path: "/".to_string(),
1787        raw_query: String::new(),
1788        method: Method::POST,
1789        is_query_protocol: false,
1790        access_key_id: None,
1791        principal: None,
1792    }
1793}
1794
1795/// Parse a JSON `AwsResponse` body, or `Null` on failure.
1796fn parse_body(resp: &AwsResponse) -> Value {
1797    serde_json::from_slice(resp.body.expect_bytes()).unwrap_or(Value::Null)
1798}
1799
1800/// Resolve (vcpus, memoryMiB) from Batch container properties — either the
1801/// legacy `vcpus`/`memory` fields or the modern `resourceRequirements` list.
1802fn container_resources(container: &Value) -> (f64, i64) {
1803    let mut vcpus = container
1804        .get("vcpus")
1805        .and_then(Value::as_f64)
1806        .unwrap_or(1.0);
1807    let mut memory = container
1808        .get("memory")
1809        .and_then(Value::as_i64)
1810        .unwrap_or(512);
1811    if let Some(rr) = container
1812        .get("resourceRequirements")
1813        .and_then(Value::as_array)
1814    {
1815        for r in rr {
1816            let ty = r.get("type").and_then(Value::as_str).unwrap_or("");
1817            let val = r
1818                .get("value")
1819                .and_then(Value::as_str)
1820                .and_then(|s| s.parse::<f64>().ok());
1821            match (ty, val) {
1822                ("VCPU", Some(v)) => vcpus = v,
1823                ("MEMORY", Some(v)) => memory = v as i64,
1824                _ => {}
1825            }
1826        }
1827    }
1828    (vcpus.max(0.25), memory.max(4))
1829}
1830
1831/// Persist the Batch state now (used by the background status poller, which
1832/// can't call the `&self` snapshot method).
1833async fn save_snapshot_now(
1834    state: &SharedBatchState,
1835    store: &Option<Arc<dyn SnapshotStore>>,
1836    lock: &Arc<AsyncMutex<()>>,
1837) {
1838    let Some(store) = store.clone() else {
1839        return;
1840    };
1841    let _guard = lock.lock().await;
1842    let bytes = {
1843        let snap = BatchSnapshot {
1844            schema_version: BATCH_SNAPSHOT_SCHEMA_VERSION,
1845            accounts: Some(state.read().clone()),
1846        };
1847        serde_json::to_vec(&snap).unwrap_or_default()
1848    };
1849    let _ = tokio::task::spawn_blocking(move || store.save(&bytes)).await;
1850}
1851
1852/// Inject `AWS_BATCH_JOB_ARRAY_INDEX=<index>` into a container's environment so
1853/// each array child can select its slice of work (the AWS Batch contract).
1854fn with_array_index_env(mut container: Value, index: i64) -> Value {
1855    if let Some(obj) = container.as_object_mut() {
1856        let env = obj
1857            .entry("environment".to_string())
1858            .or_insert_with(|| json!([]));
1859        if let Some(arr) = env.as_array_mut() {
1860            arr.retain(|e| {
1861                e.get("name").and_then(Value::as_str) != Some("AWS_BATCH_JOB_ARRAY_INDEX")
1862            });
1863            arr.push(json!({ "name": "AWS_BATCH_JOB_ARRAY_INDEX", "value": index.to_string() }));
1864        }
1865    }
1866    container
1867}
1868
1869/// Aggregate an array parent's child statuses into AWS Batch's
1870/// `arrayProperties.statusSummary` counts + an overall parent status.
1871fn array_status_summary(children: &[&Value]) -> (Value, &'static str) {
1872    let mut summary = serde_json::Map::new();
1873    for s in [
1874        "SUBMITTED",
1875        "PENDING",
1876        "RUNNABLE",
1877        "STARTING",
1878        "RUNNING",
1879        "SUCCEEDED",
1880        "FAILED",
1881    ] {
1882        let n = children
1883            .iter()
1884            .filter(|c| c.get("status").and_then(Value::as_str) == Some(s))
1885            .count();
1886        summary.insert(s.to_string(), json!(n));
1887    }
1888    let total = children.len();
1889    let succeeded = summary["SUCCEEDED"].as_u64().unwrap_or(0) as usize;
1890    let failed = summary["FAILED"].as_u64().unwrap_or(0) as usize;
1891    let status = if total > 0 && succeeded + failed == total {
1892        if failed > 0 {
1893            "FAILED"
1894        } else {
1895            "SUCCEEDED"
1896        }
1897    } else if summary["RUNNING"].as_u64().unwrap_or(0) > 0
1898        || summary["STARTING"].as_u64().unwrap_or(0) > 0
1899    {
1900        "RUNNING"
1901    } else {
1902        "PENDING"
1903    };
1904    (Value::Object(summary), status)
1905}
1906
1907/// Read a JSON string array into a set of owned strings.
1908fn string_set(body: &Value, key: &str) -> std::collections::HashSet<String> {
1909    body.get(key)
1910        .and_then(Value::as_array)
1911        .map(|a| {
1912            a.iter()
1913                .filter_map(|v| v.as_str().map(String::from))
1914                .collect()
1915        })
1916        .unwrap_or_default()
1917}
1918
1919/// True if `wanted` is empty (match-all) or the resource's name/arn is wanted.
1920fn match_named(
1921    res: &Value,
1922    wanted: &std::collections::HashSet<String>,
1923    name_key: &str,
1924    arn_key: &str,
1925) -> bool {
1926    if wanted.is_empty() {
1927        return true;
1928    }
1929    let name = res.get(name_key).and_then(Value::as_str);
1930    let arn = res.get(arn_key).and_then(Value::as_str);
1931    name.map(|n| wanted.contains(n)).unwrap_or(false)
1932        || arn.map(|a| wanted.contains(a)).unwrap_or(false)
1933}
1934
1935/// Resolve a delete/describe identifier that may be a name or an ARN to the
1936/// store key (the resource name = last ARN path segment).
1937fn arn_or_name(body: &Value, key: &str) -> Result<String, AwsServiceError> {
1938    let raw = body
1939        .get(key)
1940        .and_then(Value::as_str)
1941        .ok_or_else(|| client_error("ClientException", format!("{key} is required")))?;
1942    Ok(raw.rsplit('/').next().unwrap_or(raw).to_string())
1943}
1944
1945fn hex_val(b: u8) -> Option<u8> {
1946    match b {
1947        b'0'..=b'9' => Some(b - b'0'),
1948        b'a'..=b'f' => Some(b - b'a' + 10),
1949        b'A'..=b'F' => Some(b - b'A' + 10),
1950        _ => None,
1951    }
1952}
1953
1954fn percent_decode(s: &str) -> String {
1955    // ARNs in the path are percent-encoded by SDKs; decode %XX escapes on the
1956    // raw BYTES and reassemble as UTF-8. Slicing the &str directly
1957    // (`&s[i+1..i+3]`) panics when a `%` sits within a multi-byte char boundary
1958    // (e.g. `%€`), which would kill the request task and drop the connection.
1959    let bytes = s.as_bytes();
1960    let mut out = Vec::with_capacity(bytes.len());
1961    let mut i = 0;
1962    while i < bytes.len() {
1963        if bytes[i] == b'%' && i + 2 < bytes.len() {
1964            if let (Some(hi), Some(lo)) = (hex_val(bytes[i + 1]), hex_val(bytes[i + 2])) {
1965                out.push(hi * 16 + lo);
1966                i += 3;
1967                continue;
1968            }
1969        }
1970        out.push(bytes[i]);
1971        i += 1;
1972    }
1973    String::from_utf8_lossy(&out).into_owned()
1974}
1975
1976#[async_trait]
1977impl AwsService for BatchService {
1978    fn service_name(&self) -> &str {
1979        "batch"
1980    }
1981
1982    fn supported_actions(&self) -> &[&str] {
1983        SUPPORTED_ACTIONS
1984    }
1985
1986    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1987        let Some(action) = Self::resolve_action(&req) else {
1988            return Err(AwsServiceError::aws_error(
1989                StatusCode::NOT_FOUND,
1990                "ResourceNotFoundException",
1991                format!("Unknown operation: {} {}", req.method, req.raw_path),
1992            ));
1993        };
1994        // SubmitJob is async (it launches a real ECS task); everything else is
1995        // a synchronous metadata op.
1996        let result = if action == "SubmitJob" {
1997            self.submit_job(&req).await
1998        } else {
1999            self.dispatch(action, &req)
2000        };
2001        if MUTATING_ACTIONS.contains(&action)
2002            && matches!(result.as_ref(), Ok(resp) if resp.status.is_success())
2003        {
2004            self.save_snapshot().await;
2005        }
2006        result
2007    }
2008}
2009
2010#[cfg(test)]
2011mod tests {
2012    use super::*;
2013    use crate::state::BatchAccounts;
2014    use parking_lot::RwLock;
2015    use std::collections::HashMap;
2016
2017    fn svc() -> BatchService {
2018        BatchService::new(Arc::new(RwLock::new(BatchAccounts::new())))
2019    }
2020
2021    fn req(path: &str, body: Value) -> AwsRequest {
2022        let p = path.split('?').next().unwrap_or(path);
2023        let path_segments: Vec<String> = p
2024            .split('/')
2025            .filter(|s| !s.is_empty())
2026            .map(String::from)
2027            .collect();
2028        AwsRequest {
2029            service: "batch".into(),
2030            action: String::new(),
2031            region: "us-east-1".into(),
2032            account_id: "123456789012".into(),
2033            request_id: "t".into(),
2034            headers: http::HeaderMap::new(),
2035            query_params: HashMap::new(),
2036            body: bytes::Bytes::from(serde_json::to_vec(&body).unwrap()),
2037            body_stream: parking_lot::Mutex::new(None),
2038            path_segments,
2039            raw_path: path.to_string(),
2040            raw_query: String::new(),
2041            method: Method::POST,
2042            is_query_protocol: false,
2043            access_key_id: None,
2044            principal: None,
2045        }
2046    }
2047
2048    fn body_of(r: AwsResponse) -> Value {
2049        serde_json::from_slice(r.body.expect_bytes()).unwrap()
2050    }
2051
2052    async fn mk_queue(s: &BatchService, name: &str) {
2053        s.handle(req(
2054            "/v1/createjobqueue",
2055            json!({"jobQueueName": name, "priority": 1}),
2056        ))
2057        .await
2058        .unwrap();
2059    }
2060
2061    #[tokio::test]
2062    async fn compute_environment_lifecycle() {
2063        let s = svc();
2064        let r = s
2065            .handle(req(
2066                "/v1/createcomputeenvironment",
2067                json!({"computeEnvironmentName": "ce1", "type": "MANAGED"}),
2068            ))
2069            .await
2070            .unwrap();
2071        let v = body_of(r);
2072        assert_eq!(v["computeEnvironmentName"], "ce1");
2073        assert!(v["computeEnvironmentArn"]
2074            .as_str()
2075            .unwrap()
2076            .contains("compute-environment/ce1"));
2077
2078        let d = body_of(
2079            s.handle(req("/v1/describecomputeenvironments", json!({})))
2080                .await
2081                .unwrap(),
2082        );
2083        let ces = d["computeEnvironments"].as_array().unwrap();
2084        assert_eq!(ces.len(), 1);
2085        assert_eq!(ces[0]["status"], "VALID");
2086        assert_eq!(ces[0]["state"], "ENABLED");
2087
2088        s.handle(req(
2089            "/v1/deletecomputeenvironment",
2090            json!({"computeEnvironment": "ce1"}),
2091        ))
2092        .await
2093        .unwrap();
2094        let d2 = body_of(
2095            s.handle(req("/v1/describecomputeenvironments", json!({})))
2096                .await
2097                .unwrap(),
2098        );
2099        assert_eq!(d2["computeEnvironments"].as_array().unwrap().len(), 0);
2100    }
2101
2102    #[tokio::test]
2103    async fn job_definition_revisions_increment() {
2104        let s = svc();
2105        for expected in 1..=3 {
2106            let v = body_of(
2107                s.handle(req(
2108                    "/v1/registerjobdefinition",
2109                    json!({"jobDefinitionName": "jd", "type": "container"}),
2110                ))
2111                .await
2112                .unwrap(),
2113            );
2114            assert_eq!(v["revision"], expected);
2115        }
2116        let d = body_of(
2117            s.handle(req(
2118                "/v1/describejobdefinitions",
2119                json!({"jobDefinitionName": "jd"}),
2120            ))
2121            .await
2122            .unwrap(),
2123        );
2124        assert_eq!(d["jobDefinitions"].as_array().unwrap().len(), 3);
2125    }
2126
2127    #[tokio::test]
2128    async fn unimplemented_op_errors_not_fakes() {
2129        let s = svc();
2130        // CreateConsumableResource is not yet implemented (a later batch); it
2131        // must return a faithful 501, never a fake success.
2132        let err = match s
2133            .handle(req(
2134                "/v1/createconsumableresource",
2135                json!({"consumableResourceName": "r"}),
2136            ))
2137            .await
2138        {
2139            Err(e) => e,
2140            Ok(_) => panic!("unimplemented op must not fake-succeed"),
2141        };
2142        assert_eq!(err.status(), StatusCode::NOT_IMPLEMENTED);
2143    }
2144
2145    #[tokio::test]
2146    async fn tags_agree_across_describe_and_list_tags() {
2147        let s = svc();
2148        // Create WITH inline tags.
2149        let c = body_of(
2150            s.handle(req(
2151                "/v1/createcomputeenvironment",
2152                json!({"computeEnvironmentName": "ce", "type": "MANAGED",
2153                       "tags": {"team": "data"}}),
2154            ))
2155            .await
2156            .unwrap(),
2157        );
2158        let arn = c["computeEnvironmentArn"].as_str().unwrap().to_string();
2159        let enc = arn.replace('/', "%2F");
2160
2161        // ListTagsForResource reflects the create-time tags (used to be empty:
2162        // create wrote them inline, ListTags read a separate store).
2163        let mut lt = req(&format!("/v1/tags/{enc}"), json!({}));
2164        lt.method = Method::GET;
2165        let tags = body_of(s.handle(lt).await.unwrap());
2166        assert_eq!(tags["tags"]["team"], "data");
2167
2168        // TagResource adds a tag; Describe* now reflects it (used to diverge).
2169        let mut tr = req(&format!("/v1/tags/{enc}"), json!({"tags": {"env": "prod"}}));
2170        tr.method = Method::POST;
2171        s.handle(tr).await.unwrap();
2172        let d = body_of(
2173            s.handle(req("/v1/describecomputeenvironments", json!({})))
2174                .await
2175                .unwrap(),
2176        );
2177        let ce = &d["computeEnvironments"][0];
2178        assert_eq!(ce["tags"]["team"], "data");
2179        assert_eq!(ce["tags"]["env"], "prod");
2180        // AWS always reports the orchestration backend.
2181        assert_eq!(ce["containerOrchestrationType"], "ECS");
2182    }
2183
2184    #[tokio::test]
2185    async fn job_submit_describe_cancel_lifecycle() {
2186        let s = svc();
2187        mk_queue(&s, "q1").await;
2188        let sub = body_of(
2189            s.handle(req(
2190                "/v1/submitjob",
2191                json!({"jobName": "j1", "jobQueue": "q1", "jobDefinition": "jd:1"}),
2192            ))
2193            .await
2194            .unwrap(),
2195        );
2196        let job_id = sub["jobId"].as_str().unwrap().to_string();
2197        assert_eq!(sub["jobName"], "j1");
2198
2199        let d = body_of(
2200            s.handle(req("/v1/describejobs", json!({"jobs": [job_id]})))
2201                .await
2202                .unwrap(),
2203        );
2204        assert_eq!(d["jobs"][0]["status"], "SUBMITTED");
2205
2206        // No jobStatus -> AWS returns only RUNNING (this job is SUBMITTED).
2207        let running = body_of(
2208            s.handle(req("/v1/listjobs", json!({"jobQueue": "q1"})))
2209                .await
2210                .unwrap(),
2211        );
2212        assert_eq!(running["jobSummaryList"].as_array().unwrap().len(), 0);
2213        // Filtering by the actual status returns it, and the summary carries
2214        // jobArn (an always-present field the old summary dropped).
2215        let l = body_of(
2216            s.handle(req(
2217                "/v1/listjobs",
2218                json!({"jobQueue": "q1", "jobStatus": "SUBMITTED"}),
2219            ))
2220            .await
2221            .unwrap(),
2222        );
2223        assert_eq!(l["jobSummaryList"].as_array().unwrap().len(), 1);
2224        assert!(l["jobSummaryList"][0]["jobArn"].as_str().is_some());
2225
2226        // Listing with no selector is an error (AWS requires exactly one).
2227        match s.handle(req("/v1/listjobs", json!({}))).await {
2228            Err(e) => assert!(format!("{e:?}").contains("exactly one")),
2229            Ok(_) => panic!("ListJobs without a selector must be rejected"),
2230        }
2231
2232        s.handle(req(
2233            "/v1/canceljob",
2234            json!({"jobId": job_id, "reason": "stop it"}),
2235        ))
2236        .await
2237        .unwrap();
2238        let d2 = body_of(
2239            s.handle(req("/v1/describejobs", json!({"jobs": [job_id]})))
2240                .await
2241                .unwrap(),
2242        );
2243        assert_eq!(d2["jobs"][0]["status"], "FAILED");
2244        assert_eq!(d2["jobs"][0]["statusReason"], "stop it");
2245    }
2246
2247    #[tokio::test]
2248    async fn scheduling_policy_crud() {
2249        let s = svc();
2250        let c = body_of(
2251            s.handle(req("/v1/createschedulingpolicy", json!({"name": "sp1"})))
2252                .await
2253                .unwrap(),
2254        );
2255        let arn = c["arn"].as_str().unwrap().to_string();
2256        assert!(arn.contains("scheduling-policy/sp1"));
2257        let d = body_of(
2258            s.handle(req(
2259                "/v1/describeschedulingpolicies",
2260                json!({"arns": [arn]}),
2261            ))
2262            .await
2263            .unwrap(),
2264        );
2265        assert_eq!(d["schedulingPolicies"].as_array().unwrap().len(), 1);
2266        s.handle(req("/v1/deleteschedulingpolicy", json!({"arn": "sp1"})))
2267            .await
2268            .unwrap();
2269        let l = body_of(
2270            s.handle(req("/v1/listschedulingpolicies", json!({})))
2271                .await
2272                .unwrap(),
2273        );
2274        assert_eq!(l["schedulingPolicies"].as_array().unwrap().len(), 0);
2275    }
2276
2277    #[tokio::test]
2278    async fn update_compute_environment_persists() {
2279        let s = svc();
2280        s.handle(req(
2281            "/v1/createcomputeenvironment",
2282            json!({"computeEnvironmentName": "ce1", "state": "ENABLED"}),
2283        ))
2284        .await
2285        .unwrap();
2286        s.handle(req(
2287            "/v1/updatecomputeenvironment",
2288            json!({"computeEnvironment": "ce1", "state": "DISABLED"}),
2289        ))
2290        .await
2291        .unwrap();
2292        let d = body_of(
2293            s.handle(req("/v1/describecomputeenvironments", json!({})))
2294                .await
2295                .unwrap(),
2296        );
2297        assert_eq!(d["computeEnvironments"][0]["state"], "DISABLED");
2298    }
2299
2300    #[test]
2301    fn container_resources_from_both_shapes() {
2302        // Legacy vcpus/memory.
2303        let (v, m) = container_resources(&json!({"vcpus": 2, "memory": 2048}));
2304        assert_eq!(v, 2.0);
2305        assert_eq!(m, 2048);
2306        // Modern resourceRequirements override.
2307        let (v, m) = container_resources(&json!({
2308            "resourceRequirements": [
2309                {"type": "VCPU", "value": "4"},
2310                {"type": "MEMORY", "value": "8192"}
2311            ]
2312        }));
2313        assert_eq!(v, 4.0);
2314        assert_eq!(m, 8192);
2315        // Defaults + floors.
2316        let (v, m) = container_resources(&json!({}));
2317        assert_eq!(v, 1.0);
2318        assert_eq!(m, 512);
2319    }
2320
2321    #[tokio::test]
2322    async fn resolve_container_picks_latest_revision_and_overrides() {
2323        let s = svc();
2324        for cmd in [json!(["echo", "v1"]), json!(["echo", "v2"])] {
2325            s.handle(req(
2326                "/v1/registerjobdefinition",
2327                json!({
2328                    "jobDefinitionName": "jd",
2329                    "type": "container",
2330                    "containerProperties": {"image": "alpine", "command": cmd}
2331                }),
2332            ))
2333            .await
2334            .unwrap();
2335        }
2336        // Bare name -> latest revision (v2).
2337        let c = s
2338            .resolve_container("123456789012", "jd", &json!({}))
2339            .unwrap();
2340        assert_eq!(c["command"], json!(["echo", "v2"]));
2341        // containerOverrides win.
2342        let c = s
2343            .resolve_container(
2344                "123456789012",
2345                "jd:1",
2346                &json!({"containerOverrides": {"command": ["overridden"]}}),
2347            )
2348            .unwrap();
2349        assert_eq!(c["command"], json!(["overridden"]));
2350        assert_eq!(c["image"], "alpine");
2351    }
2352
2353    #[tokio::test]
2354    async fn submit_without_ecs_parks_at_submitted_never_auto_succeeds() {
2355        // No ECS backend wired: the job must stay SUBMITTED (honest), not
2356        // fake a SUCCEEDED like the rivals do.
2357        let s = svc();
2358        mk_queue(&s, "q").await;
2359        s.handle(req(
2360            "/v1/registerjobdefinition",
2361            json!({"jobDefinitionName": "jd", "type": "container",
2362                   "containerProperties": {"image": "alpine"}}),
2363        ))
2364        .await
2365        .unwrap();
2366        let sub = body_of(
2367            s.handle(req(
2368                "/v1/submitjob",
2369                json!({"jobName": "j", "jobQueue": "q", "jobDefinition": "jd"}),
2370            ))
2371            .await
2372            .unwrap(),
2373        );
2374        let id = sub["jobId"].as_str().unwrap().to_string();
2375        let d = body_of(
2376            s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2377                .await
2378                .unwrap(),
2379        );
2380        assert_eq!(d["jobs"][0]["status"], "SUBMITTED");
2381    }
2382
2383    #[tokio::test]
2384    async fn reconcile_fails_in_flight_jobs_after_restart() {
2385        // Simulate a restart: a snapshot restored a job in a non-terminal state
2386        // whose background driver no longer exists. Reconcile must fail it
2387        // (with a clear reason) rather than leave it frozen forever.
2388        let s = svc();
2389        mk_queue(&s, "q").await;
2390        s.handle(req(
2391            "/v1/registerjobdefinition",
2392            json!({"jobDefinitionName": "jd", "type": "container",
2393                   "containerProperties": {"image": "alpine"}}),
2394        ))
2395        .await
2396        .unwrap();
2397        let sub = body_of(
2398            s.handle(req(
2399                "/v1/submitjob",
2400                json!({"jobName": "j", "jobQueue": "q", "jobDefinition": "jd"}),
2401            ))
2402            .await
2403            .unwrap(),
2404        );
2405        let id = sub["jobId"].as_str().unwrap().to_string();
2406
2407        s.reconcile_persisted_jobs().await;
2408
2409        let d = body_of(
2410            s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2411                .await
2412                .unwrap(),
2413        );
2414        assert_eq!(d["jobs"][0]["status"], "FAILED");
2415        assert_eq!(
2416            d["jobs"][0]["statusReason"],
2417            "Job interrupted by a fakecloud restart"
2418        );
2419        assert!(d["jobs"][0]["stoppedAt"].is_i64());
2420
2421        // A second reconcile is a no-op: already-terminal jobs are untouched.
2422        s.reconcile_persisted_jobs().await;
2423        let d2 = body_of(
2424            s.handle(req("/v1/describejobs", json!({"jobs": [id]})))
2425                .await
2426                .unwrap(),
2427        );
2428        assert_eq!(d2["jobs"][0]["status"], "FAILED");
2429    }
2430
2431    #[test]
2432    fn array_index_env_injected() {
2433        let c = with_array_index_env(json!({"image": "alpine"}), 5);
2434        let env = c["environment"].as_array().unwrap();
2435        assert!(env
2436            .iter()
2437            .any(|e| e["name"] == "AWS_BATCH_JOB_ARRAY_INDEX" && e["value"] == "5"));
2438    }
2439
2440    #[tokio::test]
2441    async fn array_job_spawns_children_and_parent_aggregates() {
2442        let s = svc();
2443        mk_queue(&s, "q").await;
2444        s.handle(req(
2445            "/v1/registerjobdefinition",
2446            json!({"jobDefinitionName": "jd", "type": "container",
2447                   "containerProperties": {"image": "alpine"}}),
2448        ))
2449        .await
2450        .unwrap();
2451        let sub = body_of(
2452            s.handle(req(
2453                "/v1/submitjob",
2454                json!({"jobName": "arr", "jobQueue": "q", "jobDefinition": "jd",
2455                       "arrayProperties": {"size": 3}}),
2456            ))
2457            .await
2458            .unwrap(),
2459        );
2460        let parent = sub["jobId"].as_str().unwrap().to_string();
2461
2462        // The arrayJobId selector returns exactly the 3 children (no ECS wired
2463        // -> they stay SUBMITTED honestly, so filter by that status).
2464        let listed = body_of(
2465            s.handle(req(
2466                "/v1/listjobs",
2467                json!({"arrayJobId": parent, "jobStatus": "SUBMITTED"}),
2468            ))
2469            .await
2470            .unwrap(),
2471        );
2472        let children = listed["jobSummaryList"].as_array().unwrap();
2473        assert_eq!(children.len(), 3);
2474        assert!(children.iter().all(|j| j["jobId"]
2475            .as_str()
2476            .unwrap()
2477            .starts_with(&format!("{parent}:"))));
2478
2479        // The parent aggregates: PENDING with statusSummary SUBMITTED=3.
2480        let d = body_of(
2481            s.handle(req("/v1/describejobs", json!({"jobs": [parent]})))
2482                .await
2483                .unwrap(),
2484        );
2485        let p = &d["jobs"][0];
2486        assert_eq!(p["status"], "PENDING");
2487        assert_eq!(p["arrayProperties"]["statusSummary"]["SUBMITTED"], 3);
2488        assert_eq!(p["arrayProperties"]["size"], 3);
2489    }
2490
2491    #[test]
2492    fn array_summary_terminal_states() {
2493        let succ = json!({"status": "SUCCEEDED"});
2494        let fail = json!({"status": "FAILED"});
2495        let run = json!({"status": "RUNNING"});
2496        let (_, st) = array_status_summary(&[&succ, &succ]);
2497        assert_eq!(st, "SUCCEEDED");
2498        let (_, st) = array_status_summary(&[&succ, &fail]);
2499        assert_eq!(st, "FAILED");
2500        let (_, st) = array_status_summary(&[&succ, &run]);
2501        assert_eq!(st, "RUNNING");
2502    }
2503
2504    #[tokio::test]
2505    async fn depends_on_parks_at_pending() {
2506        let s = svc();
2507        mk_queue(&s, "q").await;
2508        s.handle(req(
2509            "/v1/registerjobdefinition",
2510            json!({"jobDefinitionName": "jd", "type": "container",
2511                   "containerProperties": {"image": "alpine"}}),
2512        ))
2513        .await
2514        .unwrap();
2515        let a = body_of(
2516            s.handle(req(
2517                "/v1/submitjob",
2518                json!({"jobName": "a", "jobQueue": "q", "jobDefinition": "jd"}),
2519            ))
2520            .await
2521            .unwrap(),
2522        );
2523        let a_id = a["jobId"].as_str().unwrap().to_string();
2524        let b = body_of(
2525            s.handle(req(
2526                "/v1/submitjob",
2527                json!({"jobName": "b", "jobQueue": "q", "jobDefinition": "jd",
2528                       "dependsOn": [{"jobId": a_id, "type": "SEQUENTIAL"}]}),
2529            ))
2530            .await
2531            .unwrap(),
2532        );
2533        // B parks at PENDING (its dep A is only SUBMITTED, never SUCCEEDED
2534        // without a runtime) — it must not launch ahead of its dependency.
2535        let d = body_of(
2536            s.handle(req(
2537                "/v1/describejobs",
2538                json!({"jobs": [b["jobId"].clone()]}),
2539            ))
2540            .await
2541            .unwrap(),
2542        );
2543        assert_eq!(d["jobs"][0]["status"], "PENDING");
2544    }
2545
2546    #[test]
2547    fn routes_tag_family_by_method() {
2548        let mut r = req("/v1/tags/arn%3Aaws", json!({}));
2549        r.method = Method::GET;
2550        assert_eq!(
2551            BatchService::resolve_action(&r),
2552            Some("ListTagsForResource")
2553        );
2554        r.method = Method::DELETE;
2555        assert_eq!(BatchService::resolve_action(&r), Some("UntagResource"));
2556    }
2557
2558    #[test]
2559    fn percent_decode_handles_multibyte_without_panicking() {
2560        // A `%` adjacent to a multi-byte UTF-8 char used to slice on a non-char
2561        // boundary and panic, killing the request task. The bytes after `%` are
2562        // not ASCII hex, so the `%` and the char pass through unchanged.
2563        assert_eq!(percent_decode("%€"), "%€");
2564        // Trailing/partial escapes pass through untouched, no panic.
2565        assert_eq!(percent_decode("%"), "%");
2566        assert_eq!(percent_decode("%2"), "%2");
2567        assert_eq!(percent_decode("%zz"), "%zz");
2568        // Valid escapes still decode, and multi-byte content round-trips.
2569        assert_eq!(percent_decode("arn%3Aaws%3Abatch"), "arn:aws:batch");
2570        assert_eq!(percent_decode("caf%C3%A9"), "café");
2571        assert_eq!(percent_decode("plain"), "plain");
2572    }
2573
2574    #[tokio::test]
2575    async fn list_tags_with_multibyte_arn_does_not_panic() {
2576        let s = svc();
2577        // Raw multi-byte byte in the ARN path segment (what dispatch would pass
2578        // through unescaped) must yield a normal response, not a dropped conn.
2579        let mut r = req("/v1/tags/%€", json!({}));
2580        r.method = Method::GET;
2581        let resp = s.handle(r).await.unwrap();
2582        let v = body_of(resp);
2583        assert!(v.get("tags").is_some());
2584    }
2585
2586    #[tokio::test]
2587    async fn submit_job_rejects_out_of_range_array_size() {
2588        let s = svc();
2589        mk_queue(&s, "q").await;
2590        for bad in [1_i64, 0, -3, 10_001, 2_000_000_000] {
2591            let res = s
2592                .handle(req(
2593                    "/v1/submitjob",
2594                    json!({
2595                        "jobName": "j", "jobQueue": "q", "jobDefinition": "d",
2596                        "arrayProperties": {"size": bad}
2597                    }),
2598                ))
2599                .await;
2600            match res {
2601                Err(e) => assert!(
2602                    format!("{e:?}").contains("between 2 and 10000"),
2603                    "size {bad}: wrong error {e:?}"
2604                ),
2605                Ok(_) => panic!("size {bad}: out-of-range array size must be rejected"),
2606            }
2607        }
2608    }
2609}