Skip to main content

fakecloud_ecr/
service.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19    EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20    Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24    "CreateRepository",
25    "DeleteRepository",
26    "DescribeRepositories",
27    "PutImageTagMutability",
28    "PutImageScanningConfiguration",
29    "SetRepositoryPolicy",
30    "GetRepositoryPolicy",
31    "DeleteRepositoryPolicy",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutImage",
36    "BatchGetImage",
37    "BatchDeleteImage",
38    "BatchCheckLayerAvailability",
39    "DescribeImages",
40    "ListImages",
41    "GetDownloadUrlForLayer",
42    "InitiateLayerUpload",
43    "UploadLayerPart",
44    "CompleteLayerUpload",
45    "GetAuthorizationToken",
46    "PutLifecyclePolicy",
47    "GetLifecyclePolicy",
48    "DeleteLifecyclePolicy",
49    "StartLifecyclePolicyPreview",
50    "GetLifecyclePolicyPreview",
51    "StartImageScan",
52    "DescribeImageScanFindings",
53    "DescribeRegistry",
54    "GetRegistryPolicy",
55    "PutRegistryPolicy",
56    "DeleteRegistryPolicy",
57    "GetRegistryScanningConfiguration",
58    "PutRegistryScanningConfiguration",
59    "BatchGetRepositoryScanningConfiguration",
60    "PutReplicationConfiguration",
61    "DescribeImageReplicationStatus",
62    "CreatePullThroughCacheRule",
63    "DeletePullThroughCacheRule",
64    "DescribePullThroughCacheRules",
65    "UpdatePullThroughCacheRule",
66    "ValidatePullThroughCacheRule",
67    "GetAccountSetting",
68    "PutAccountSetting",
69    "CreateRepositoryCreationTemplate",
70    "DeleteRepositoryCreationTemplate",
71    "DescribeRepositoryCreationTemplates",
72    "UpdateRepositoryCreationTemplate",
73    "GetSigningConfiguration",
74    "PutSigningConfiguration",
75    "DeleteSigningConfiguration",
76    "DescribeImageSigningStatus",
77    "RegisterPullTimeUpdateExclusion",
78    "DeregisterPullTimeUpdateExclusion",
79    "ListPullTimeUpdateExclusions",
80    "ListImageReferrers",
81    "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85    state: SharedEcrState,
86    snapshot_store: Option<Arc<dyn SnapshotStore>>,
87    snapshot_lock: Arc<AsyncMutex<()>>,
88    /// KMS state handle — when wired, repositories configured with
89    /// `EncryptionConfiguration.encryption_type == "KMS"` store layer
90    /// blobs encrypted under the configured CMK via
91    /// `fakecloud_kms::api::encrypt_blob` / `decrypt_blob`.
92    kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95impl EcrService {
96    pub fn new(state: SharedEcrState) -> Self {
97        Self {
98            state,
99            snapshot_store: None,
100            snapshot_lock: Arc::new(AsyncMutex::new(())),
101            kms_state: None,
102        }
103    }
104
105    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
106        self.snapshot_store = Some(store);
107        self
108    }
109
110    pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
111        self.kms_state = Some(kms);
112        self
113    }
114
115    /// Read-only accessor for the multi-account state. The sibling
116    /// `oci` module owns the HTTP-layer adapter for the OCI v2
117    /// protocol and needs to reach the same repositories + blobs the
118    /// JSON control-plane ops read and write.
119    pub fn state_handle(&self) -> &SharedEcrState {
120        &self.state
121    }
122
123    /// Handle for the shared KMS state when wired. `None` skips the
124    /// encrypt/decrypt paths and stores / returns plaintext blobs.
125    pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
126        self.kms_state.as_ref()
127    }
128
129    async fn save_snapshot(&self) {
130        Self::save_snapshot_with(
131            self.state.clone(),
132            self.snapshot_store.clone(),
133            self.snapshot_lock.clone(),
134        )
135        .await
136    }
137
138    /// Snapshot writer reachable from background tasks (e.g. the async
139    /// image-scanner) that don't hold a `&self` reference. Equivalent
140    /// to [`save_snapshot`] but takes the components as owned clones.
141    pub(crate) async fn save_snapshot_with(
142        state: SharedEcrState,
143        store: Option<Arc<dyn SnapshotStore>>,
144        lock: Arc<AsyncMutex<()>>,
145    ) {
146        let Some(store) = store else {
147            return;
148        };
149        let _guard = lock.lock().await;
150        let snapshot = EcrSnapshot {
151            schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
152            accounts: Some(state.read().clone()),
153        };
154        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
155            let bytes = serde_json::to_vec(&snapshot)
156                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
157            store.save(&bytes)
158        })
159        .await;
160        match join {
161            Ok(Ok(())) => {}
162            Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
163            Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
164        }
165    }
166}
167
168#[async_trait]
169impl AwsService for EcrService {
170    fn service_name(&self) -> &str {
171        "ecr"
172    }
173
174    async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175        // OCI v2 Distribution requests come in as path-only REST
176        // (`/v2/...` with no `X-Amz-Target`). Dispatch them before the
177        // JSON control plane. Blob upload PATCH/PUT consume the raw
178        // body stream directly via a per-upload spool file — they
179        // never call `drain_request_stream`, so a 1 GiB layer push
180        // moves through fakecloud in constant memory. Other OCI routes
181        // (manifest PUT, blob HEAD/GET, …) keep using `request.body`
182        // so we drain the stream conditionally.
183        if request
184            .path_segments
185            .first()
186            .map(|s| s == "v2")
187            .unwrap_or(false)
188        {
189            // Drain unless this is a blob-upload PATCH/PUT — those
190            // routes own the streaming consumer.
191            let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
192                && request.path_segments.len() >= 5
193                && request.path_segments[request.path_segments.len() - 2] == "uploads";
194            if !is_blob_upload {
195                if let Some(stream) = request.take_body_stream() {
196                    request.body = fakecloud_core::service::drain_request_stream(stream).await?;
197                }
198            }
199            let result = crate::oci::dispatch(self, &request).await;
200            // POST/PUT/PATCH/DELETE always mutate. GET to a `blobs/<digest>`
201            // or `manifests/<reference>` endpoint also mutates because those
202            // handlers bump the touched image's `last_in_use_at` /
203            // `in_use_count` / `last_recorded_pull_time`. `tags/list` (also
204            // GET) is read-only and excluded.
205            let is_pull_get = request.method == http::Method::GET
206                && request.path_segments.len() >= 3
207                && matches!(
208                    request.path_segments[request.path_segments.len() - 2].as_str(),
209                    "blobs" | "manifests"
210                );
211            let mutates_oci = is_pull_get
212                || matches!(
213                    request.method,
214                    http::Method::POST
215                        | http::Method::PUT
216                        | http::Method::PATCH
217                        | http::Method::DELETE
218                );
219            if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220                self.save_snapshot().await;
221            }
222            return result;
223        }
224
225        // JSON control plane: actions read `request.body`, so drain.
226        if let Some(stream) = request.take_body_stream() {
227            request.body = fakecloud_core::service::drain_request_stream(stream).await?;
228        }
229
230        let mutates = is_mutating(request.action.as_str());
231        let result = match request.action.as_str() {
232            "CreateRepository" => self.create_repository(&request),
233            "DeleteRepository" => self.delete_repository(&request),
234            "DescribeRepositories" => self.describe_repositories(&request),
235            "PutImageTagMutability" => self.put_image_tag_mutability(&request),
236            "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
237            "SetRepositoryPolicy" => self.set_repository_policy(&request),
238            "GetRepositoryPolicy" => self.get_repository_policy(&request),
239            "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
240            "TagResource" => self.tag_resource(&request),
241            "UntagResource" => self.untag_resource(&request),
242            "ListTagsForResource" => self.list_tags_for_resource(&request),
243            "PutImage" => self.put_image(&request),
244            "BatchGetImage" => self.batch_get_image(&request),
245            "BatchDeleteImage" => self.batch_delete_image(&request),
246            "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
247            "DescribeImages" => self.describe_images(&request),
248            "ListImages" => self.list_images(&request),
249            "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
250            "InitiateLayerUpload" => self.initiate_layer_upload(&request),
251            "UploadLayerPart" => self.upload_layer_part(&request),
252            "CompleteLayerUpload" => self.complete_layer_upload(&request),
253            "GetAuthorizationToken" => self.get_authorization_token(&request),
254            "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
255            "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
256            "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
257            "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
258            "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
259            "StartImageScan" => self.start_image_scan(&request),
260            "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
261            "DescribeRegistry" => self.describe_registry(&request),
262            "GetRegistryPolicy" => self.get_registry_policy(&request),
263            "PutRegistryPolicy" => self.put_registry_policy(&request),
264            "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
265            "GetRegistryScanningConfiguration" => {
266                self.get_registry_scanning_configuration(&request)
267            }
268            "PutRegistryScanningConfiguration" => {
269                self.put_registry_scanning_configuration(&request)
270            }
271            "BatchGetRepositoryScanningConfiguration" => {
272                self.batch_get_repository_scanning_configuration(&request)
273            }
274            "PutReplicationConfiguration" => self.put_replication_configuration(&request),
275            "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
276            "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
277            "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
278            "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
279            "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
280            "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
281            "GetAccountSetting" => self.get_account_setting(&request),
282            "PutAccountSetting" => self.put_account_setting(&request),
283            "CreateRepositoryCreationTemplate" => {
284                self.create_repository_creation_template(&request)
285            }
286            "DeleteRepositoryCreationTemplate" => {
287                self.delete_repository_creation_template(&request)
288            }
289            "DescribeRepositoryCreationTemplates" => {
290                self.describe_repository_creation_templates(&request)
291            }
292            "UpdateRepositoryCreationTemplate" => {
293                self.update_repository_creation_template(&request)
294            }
295            "GetSigningConfiguration" => self.get_signing_configuration(&request),
296            "PutSigningConfiguration" => self.put_signing_configuration(&request),
297            "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
298            "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
299            "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
300            "DeregisterPullTimeUpdateExclusion" => {
301                self.deregister_pull_time_update_exclusion(&request)
302            }
303            "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
304            "ListImageReferrers" => self.list_image_referrers(&request),
305            "UpdateImageStorageClass" => self.update_image_storage_class(&request),
306            _ => Err(AwsServiceError::action_not_implemented(
307                self.service_name(),
308                &request.action,
309            )),
310        };
311        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
312            self.save_snapshot().await;
313        }
314        result
315    }
316
317    fn supported_actions(&self) -> &[&str] {
318        SUPPORTED_ACTIONS
319    }
320}
321
322// -------- helpers --------
323
324// -------- operations --------
325
326impl EcrService {
327    fn create_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
328        let body = request.json_body();
329        let name = req_str(&body, "repositoryName")?.to_string();
330        validate_repository_name(&name)?;
331        let image_tag_mutability = opt_str(&body, "imageTagMutability")
332            .unwrap_or("MUTABLE")
333            .to_string();
334        if image_tag_mutability != "MUTABLE" && image_tag_mutability != "IMMUTABLE" {
335            return Err(invalid_parameter(format!(
336                "Invalid value for imageTagMutability: {image_tag_mutability}"
337            )));
338        }
339        let scan_on_push = body
340            .get("imageScanningConfiguration")
341            .and_then(|v| v.get("scanOnPush"))
342            .and_then(|v| v.as_bool())
343            .unwrap_or(false);
344        let encryption = body
345            .get("encryptionConfiguration")
346            .map(|v| EncryptionConfiguration {
347                encryption_type: v
348                    .get("encryptionType")
349                    .and_then(|x| x.as_str())
350                    .unwrap_or("AES256")
351                    .to_string(),
352                kms_key: v
353                    .get("kmsKey")
354                    .and_then(|x| x.as_str())
355                    .map(|s| s.to_string()),
356            })
357            .unwrap_or_default();
358        let tags = parse_tags(&body);
359
360        let account = target_account_id(request, &body);
361        let mut accounts = self.state.write();
362        let endpoint = accounts.endpoint().to_string();
363        let state = accounts.get_or_create(&account);
364        if state.repositories.contains_key(&name) {
365            return Err(repository_already_exists(&name));
366        }
367        let arn = state.repository_arn(&name);
368        let mut repo = Repository::new(&name, arn, state.registry_id(), &endpoint);
369        repo.image_tag_mutability = image_tag_mutability;
370        repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
371        repo.encryption_configuration = encryption;
372        for (k, v) in tags {
373            repo.tags.insert(k, v);
374        }
375        let response = repository_to_json(&repo);
376        state.repositories.insert(name.clone(), repo);
377        Ok(AwsResponse::ok_json(json!({ "repository": response })))
378    }
379
380    fn delete_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381        let body = request.json_body();
382        let name = req_str(&body, "repositoryName")?.to_string();
383        let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
384        let account = target_account_id(request, &body);
385
386        let mut accounts = self.state.write();
387        let state = accounts
388            .get_mut(&account)
389            .ok_or_else(|| repository_not_found(&name))?;
390        let repo = state
391            .repositories
392            .get(&name)
393            .ok_or_else(|| repository_not_found(&name))?;
394        // Repository-image state lands in Batch 2; until then, nothing
395        // to block the delete, so `force` is accepted but noop-ish.
396        let _ = force;
397        let snapshot = repository_to_json(repo);
398        state.repositories.remove(&name);
399        Ok(AwsResponse::ok_json(json!({ "repository": snapshot })))
400    }
401
402    fn describe_repositories(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
403        // AWS's documented default page size for DescribeRepositories.
404        const DEFAULT_PAGE_SIZE: usize = 100;
405        let body = request.json_body();
406        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
407            Some(n) => {
408                // Smithy @range(min=1, max=1000) on DescribeRepositories.maxResults.
409                if !(1..=1000).contains(&n) {
410                    return Err(invalid_parameter(format!(
411                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
412                         Member must have value between 1 and 1000",
413                    )));
414                }
415                n as usize
416            }
417            None => DEFAULT_PAGE_SIZE,
418        };
419        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
420            Some(raw) => raw.parse::<usize>().map_err(|_| {
421                AwsServiceError::aws_error(
422                    StatusCode::BAD_REQUEST,
423                    "InvalidContinuationTokenException",
424                    "The specified continuation token is not valid.",
425                )
426            })?,
427            None => 0,
428        };
429        let names: Vec<String> = body
430            .get("repositoryNames")
431            .and_then(|v| v.as_array())
432            .map(|arr| {
433                arr.iter()
434                    .filter_map(|v| v.as_str().map(str::to_string))
435                    .collect()
436            })
437            .unwrap_or_default();
438        let account = target_account_id(request, &body);
439        let accounts = self.state.read();
440        let Some(state) = accounts.get(&account) else {
441            return Ok(AwsResponse::ok_json(json!({ "repositories": [] })));
442        };
443        let mut out: Vec<Value> = Vec::new();
444        let mut next_token: Option<String> = None;
445        if names.is_empty() {
446            let all: Vec<&Repository> = state.repositories.values().collect();
447            let start = offset.min(all.len());
448            let end = (start + max_results).min(all.len());
449            for repo in &all[start..end] {
450                out.push(repository_to_json(repo));
451            }
452            if end < all.len() {
453                next_token = Some(end.to_string());
454            }
455        } else {
456            for n in &names {
457                let repo = state
458                    .repositories
459                    .get(n)
460                    .ok_or_else(|| repository_not_found(n))?;
461                out.push(repository_to_json(repo));
462            }
463        }
464        let mut response = json!({ "repositories": out });
465        if let Some(token) = next_token {
466            response["nextToken"] = json!(token);
467        }
468        Ok(AwsResponse::ok_json(response))
469    }
470
471    fn put_image_tag_mutability(
472        &self,
473        request: &AwsRequest,
474    ) -> Result<AwsResponse, AwsServiceError> {
475        let body = request.json_body();
476        let name = req_str(&body, "repositoryName")?.to_string();
477        let mutability = req_str(&body, "imageTagMutability")?.to_string();
478        if mutability != "MUTABLE" && mutability != "IMMUTABLE" {
479            return Err(invalid_parameter(format!(
480                "Invalid value for imageTagMutability: {mutability}"
481            )));
482        }
483        let account = target_account_id(request, &body);
484        let mut accounts = self.state.write();
485        let state = accounts
486            .get_mut(&account)
487            .ok_or_else(|| repository_not_found(&name))?;
488        let repo = state
489            .repositories
490            .get_mut(&name)
491            .ok_or_else(|| repository_not_found(&name))?;
492        repo.image_tag_mutability = mutability.clone();
493        let registry_id = repo.registry_id.clone();
494        Ok(AwsResponse::ok_json(json!({
495            "registryId": registry_id,
496            "repositoryName": name,
497            "imageTagMutability": mutability,
498        })))
499    }
500
501    fn put_image_scanning_configuration(
502        &self,
503        request: &AwsRequest,
504    ) -> Result<AwsResponse, AwsServiceError> {
505        let body = request.json_body();
506        let name = req_str(&body, "repositoryName")?.to_string();
507        let scan_on_push = body
508            .get("imageScanningConfiguration")
509            .and_then(|v| v.get("scanOnPush"))
510            .and_then(|v| v.as_bool())
511            .ok_or_else(|| invalid_parameter("Missing imageScanningConfiguration.scanOnPush"))?;
512        let account = target_account_id(request, &body);
513        let mut accounts = self.state.write();
514        let state = accounts
515            .get_mut(&account)
516            .ok_or_else(|| repository_not_found(&name))?;
517        let repo = state
518            .repositories
519            .get_mut(&name)
520            .ok_or_else(|| repository_not_found(&name))?;
521        repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
522        let registry_id = repo.registry_id.clone();
523        Ok(AwsResponse::ok_json(json!({
524            "registryId": registry_id,
525            "repositoryName": name,
526            "imageScanningConfiguration": { "scanOnPush": scan_on_push },
527        })))
528    }
529
530    fn set_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
531        let body = request.json_body();
532        let name = req_str(&body, "repositoryName")?.to_string();
533        let policy_text = req_str(&body, "policyText")?.to_string();
534        let account = target_account_id(request, &body);
535        let mut accounts = self.state.write();
536        let state = accounts
537            .get_mut(&account)
538            .ok_or_else(|| repository_not_found(&name))?;
539        let repo = state
540            .repositories
541            .get_mut(&name)
542            .ok_or_else(|| repository_not_found(&name))?;
543        repo.policy = Some(policy_text.clone());
544        let registry_id = repo.registry_id.clone();
545        Ok(AwsResponse::ok_json(json!({
546            "registryId": registry_id,
547            "repositoryName": name,
548            "policyText": policy_text,
549        })))
550    }
551
552    fn get_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
553        let body = request.json_body();
554        let name = req_str(&body, "repositoryName")?.to_string();
555        let account = target_account_id(request, &body);
556        let accounts = self.state.read();
557        let state = accounts
558            .get(&account)
559            .ok_or_else(|| repository_not_found(&name))?;
560        let repo = state
561            .repositories
562            .get(&name)
563            .ok_or_else(|| repository_not_found(&name))?;
564        let policy = repo
565            .policy
566            .clone()
567            .ok_or_else(|| repository_policy_not_found(&name))?;
568        Ok(AwsResponse::ok_json(json!({
569            "registryId": repo.registry_id,
570            "repositoryName": name,
571            "policyText": policy,
572        })))
573    }
574
575    fn delete_repository_policy(
576        &self,
577        request: &AwsRequest,
578    ) -> Result<AwsResponse, AwsServiceError> {
579        let body = request.json_body();
580        let name = req_str(&body, "repositoryName")?.to_string();
581        let account = target_account_id(request, &body);
582        let mut accounts = self.state.write();
583        let state = accounts
584            .get_mut(&account)
585            .ok_or_else(|| repository_not_found(&name))?;
586        let repo = state
587            .repositories
588            .get_mut(&name)
589            .ok_or_else(|| repository_not_found(&name))?;
590        let policy = repo
591            .policy
592            .take()
593            .ok_or_else(|| repository_policy_not_found(&name))?;
594        let registry_id = repo.registry_id.clone();
595        Ok(AwsResponse::ok_json(json!({
596            "registryId": registry_id,
597            "repositoryName": name,
598            "policyText": policy,
599        })))
600    }
601
602    fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603        let body = request.json_body();
604        let arn = req_str(&body, "resourceArn")?.to_string();
605        let (arn_account, name) = decode_resource_arn(&arn)?;
606        let tags = parse_tags(&body);
607        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
608        let mut accounts = self.state.write();
609        let state = accounts
610            .get_mut(&account)
611            .ok_or_else(|| repository_not_found(&name))?;
612        let repo = state
613            .repositories
614            .get_mut(&name)
615            .ok_or_else(|| repository_not_found(&name))?;
616        for (k, v) in tags {
617            repo.tags.insert(k, v);
618        }
619        Ok(AwsResponse::ok_json(json!({})))
620    }
621
622    fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
623        let body = request.json_body();
624        let arn = req_str(&body, "resourceArn")?.to_string();
625        let (arn_account, name) = decode_resource_arn(&arn)?;
626        let keys: Vec<String> = body
627            .get("tagKeys")
628            .and_then(|v| v.as_array())
629            .map(|arr| {
630                arr.iter()
631                    .filter_map(|v| v.as_str().map(str::to_string))
632                    .collect()
633            })
634            .unwrap_or_default();
635        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
636        let mut accounts = self.state.write();
637        let state = accounts
638            .get_mut(&account)
639            .ok_or_else(|| repository_not_found(&name))?;
640        let repo = state
641            .repositories
642            .get_mut(&name)
643            .ok_or_else(|| repository_not_found(&name))?;
644        for k in keys {
645            repo.tags.remove(&k);
646        }
647        Ok(AwsResponse::ok_json(json!({})))
648    }
649
650    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
651        let body = request.json_body();
652        let arn = req_str(&body, "resourceArn")?.to_string();
653        let (arn_account, name) = decode_resource_arn(&arn)?;
654        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
655        let accounts = self.state.read();
656        let state = accounts
657            .get(&account)
658            .ok_or_else(|| repository_not_found(&name))?;
659        let repo = state
660            .repositories
661            .get(&name)
662            .ok_or_else(|| repository_not_found(&name))?;
663        let tags: Vec<Value> = repo
664            .tags
665            .iter()
666            .map(|(k, v)| json!({ "Key": k, "Value": v }))
667            .collect();
668        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
669    }
670}
671
672// -------- image + layer helpers --------
673
674// -------- image + layer operations --------
675
676impl EcrService {
677    fn put_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
678        let body = request.json_body();
679        let name = req_str(&body, "repositoryName")?.to_string();
680        let manifest = req_str(&body, "imageManifest")?.to_string();
681        let manifest_media_type = opt_str(&body, "imageManifestMediaType")
682            .unwrap_or("application/vnd.docker.distribution.manifest.v2+json")
683            .to_string();
684        let supplied_tag = opt_str(&body, "imageTag").map(|s| s.to_string());
685        let supplied_digest = opt_str(&body, "imageDigest").map(|s| s.to_string());
686        let account = target_account_id(request, &body);
687
688        let computed_digest = sha256_digest(manifest.as_bytes());
689        if let Some(ref supplied) = supplied_digest {
690            if supplied != &computed_digest {
691                return Err(AwsServiceError::aws_error(
692                    StatusCode::BAD_REQUEST,
693                    "ImageDigestDoesNotMatchException",
694                    format!(
695                        "The imageDigest '{supplied}' does not match the digest of the uploaded manifest ('{computed_digest}')."
696                    ),
697                ));
698            }
699        }
700        let digest = supplied_digest.unwrap_or_else(|| computed_digest.clone());
701
702        let mut accounts = self.state.write();
703        let state = accounts
704            .get_mut(&account)
705            .ok_or_else(|| repository_not_found(&name))?;
706        let registry_match =
707            registry_scan_on_push_matches(&state.registry_scanning_configuration, &name);
708        let repo = state
709            .repositories
710            .get_mut(&name)
711            .ok_or_else(|| repository_not_found(&name))?;
712
713        check_repo_policy(
714            &account,
715            &request.account_id,
716            &repo.repository_arn,
717            &name,
718            repo.policy.as_deref(),
719            "ecr:PutImage",
720        )?;
721
722        // Immutable tag guard: if a tag is supplied, it already maps to
723        // a different digest, and the repo is IMMUTABLE, reject.
724        if let Some(ref tag) = supplied_tag {
725            if let Some(existing) = repo.image_tags.get(tag) {
726                if existing != &digest && repo.image_tag_mutability == "IMMUTABLE" {
727                    return Err(image_already_exists(&name, tag));
728                }
729            }
730        }
731
732        let image_entry = repo.images.entry(digest.clone()).or_insert_with(|| Image {
733            image_digest: digest.clone(),
734            image_manifest: manifest.clone(),
735            image_manifest_media_type: manifest_media_type.clone(),
736            artifact_media_type: None,
737            image_size_in_bytes: manifest.len() as u64,
738            image_pushed_at: Utc::now(),
739            last_recorded_pull_time: None,
740            image_status: "ACTIVE".to_string(),
741            last_archived_at: None,
742            last_activated_at: None,
743            last_in_use_at: None,
744            in_use_count: 0,
745        });
746        // If the caller re-pushes an existing digest with a new manifest
747        // payload (shouldn't happen under sha256 addressing but tolerate
748        // it), keep the latest manifest.
749        image_entry.image_manifest = manifest;
750        image_entry.image_manifest_media_type = manifest_media_type.clone();
751
752        if let Some(tag) = supplied_tag.clone() {
753            repo.image_tags.insert(tag, digest.clone());
754        }
755
756        let snapshot = repo.images.get(&digest).cloned().unwrap();
757        let scan_on_push = repo.image_scanning_configuration.scan_on_push;
758        // Registry-level fallback: when the per-repo flag is off but a
759        // registry scanning rule with frequency=SCAN_ON_PUSH matches the
760        // repo's name via its WILDCARD filter, real ECR still scans.
761        let should_scan = scan_on_push || registry_match;
762        let tag_ref = supplied_tag.as_deref();
763        let response = AwsResponse::ok_json(json!({
764            "image": {
765                "registryId": repo.registry_id,
766                "repositoryName": name,
767                "imageId": image_id_for(&snapshot, tag_ref),
768                "imageManifest": snapshot.image_manifest,
769                "imageManifestMediaType": snapshot.image_manifest_media_type,
770            }
771        }));
772        // Drop the lock before kicking the async scan; trigger_scan
773        // re-acquires it to mark IN_PROGRESS and spawn the scanner.
774        drop(accounts);
775        if should_scan {
776            self.trigger_scan(&account, &name, &digest);
777        }
778        self.replicate_image(&account, &name, &digest);
779        Ok(response)
780    }
781
782    /// Copy a freshly-pushed image to every destination configured by
783    /// the registry's replication rules, recording the per-destination
784    /// status so DescribeImageReplicationStatus returns real data. The
785    /// status flips IN_PROGRESS -> COMPLETE around each copy so a future
786    /// async path can surface real progress without reshaping the API.
787    /// Cross-account targets get a separate `EcrState`; cross-region
788    /// same-account targets share the source state (fakecloud collapses
789    /// regions per process) but still get their own status entry.
790    fn replicate_image(&self, source_account: &str, repo_name: &str, digest: &str) {
791        use crate::state::{ImageReplicationStatus, Repository};
792
793        let (rules, image, layer_blobs, source_registry_id, source_region, source_uri) = {
794            let accounts = self.state.read();
795            let Some(state) = accounts.get(source_account) else {
796                return;
797            };
798            let Some(cfg) = state.replication_configuration.as_ref() else {
799                return;
800            };
801            let Some(repo) = state.repositories.get(repo_name) else {
802                return;
803            };
804            let Some(image) = repo.images.get(digest).cloned() else {
805                return;
806            };
807            let layers: Vec<crate::state::Layer> = layers_for_image(repo, digest);
808            (
809                cfg.rules.clone(),
810                image,
811                layers,
812                repo.registry_id.clone(),
813                state.region.clone(),
814                repo.repository_uri.clone(),
815            )
816        };
817
818        // Source URI is `<host>/<repo_name>` — strip the trailing repo
819        // name to recover the host so the destination repo's URI keeps
820        // pointing at the same fakecloud endpoint.
821        let endpoint = source_uri
822            .strip_suffix(&format!("/{repo_name}"))
823            .unwrap_or(&source_uri)
824            .to_string();
825
826        let matching: Vec<_> = rules
827            .into_iter()
828            .filter(|rule| repository_filters_match(&rule.repository_filters, repo_name))
829            .flat_map(|rule| rule.destinations.into_iter())
830            .collect();
831        if matching.is_empty() {
832            return;
833        }
834
835        // Stage IN_PROGRESS entries before any copying, then flip each
836        // to COMPLETE once its copy lands. This mirrors the AWS lifecycle
837        // and lets DescribeImageReplicationStatus return meaningful state
838        // even mid-flight.
839        let mut statuses: Vec<ImageReplicationStatus> = matching
840            .iter()
841            .filter(|dest| {
842                // Same registry + same region is a no-op target — the
843                // image is already where the rule points.
844                !(dest.registry_id == source_registry_id && dest.region == source_region)
845            })
846            .map(|dest| ImageReplicationStatus {
847                region: dest.region.clone(),
848                registry_id: dest.registry_id.clone(),
849                status: "IN_PROGRESS".to_string(),
850                failure_code: None,
851                failure_reason: None,
852            })
853            .collect();
854
855        if statuses.is_empty() {
856            // Still wipe any stale entries for this digest so callers see
857            // an honest empty list when no rule actually targets a
858            // distinct destination.
859            let mut accounts = self.state.write();
860            let source_state = accounts.get_or_create(source_account);
861            if let Some(repo) = source_state.repositories.get_mut(repo_name) {
862                repo.replication_statuses.remove(digest);
863            }
864            return;
865        }
866
867        let mut accounts = self.state.write();
868        for (idx, dest) in matching.iter().enumerate() {
869            if dest.registry_id == source_registry_id && dest.region == source_region {
870                continue;
871            }
872            // Map the source-side `matching` index to its corresponding
873            // status slot (which has the same filter applied).
874            let status_idx = matching
875                .iter()
876                .take(idx + 1)
877                .filter(|d| !(d.registry_id == source_registry_id && d.region == source_region))
878                .count()
879                - 1;
880
881            // Cross-account: hop into the destination registry's state.
882            // Cross-region same-account: stay in source state — the same
883            // fakecloud process serves the "destination" region too, so a
884            // separate copy would duplicate without benefit. The status
885            // entry still records the rule fired.
886            if dest.registry_id != source_registry_id {
887                let target_state = accounts.get_or_create(&dest.registry_id);
888                // Provision the mirror repo on demand. Real ECR only
889                // replicates into existing repos; create-on-write keeps
890                // the fakecloud flow honest by mirroring the source repo
891                // metadata when the target hasn't been pre-provisioned.
892                if !target_state.repositories.contains_key(repo_name) {
893                    let arn = format!(
894                        "arn:aws:ecr:{}:{}:repository/{}",
895                        dest.region, dest.registry_id, repo_name
896                    );
897                    let repo = Repository::new(repo_name, arn, &dest.registry_id, &endpoint);
898                    target_state
899                        .repositories
900                        .insert(repo_name.to_string(), repo);
901                }
902                let target_repo = target_state.repositories.get_mut(repo_name).unwrap();
903                target_repo
904                    .images
905                    .entry(digest.to_string())
906                    .or_insert_with(|| image.clone());
907                for layer in &layer_blobs {
908                    target_repo
909                        .layers
910                        .entry(layer.digest.clone())
911                        .or_insert_with(|| layer.clone());
912                }
913            }
914            statuses[status_idx].status = "COMPLETE".to_string();
915        }
916
917        // Record per-destination replication status on the source repo
918        // so DescribeImageReplicationStatus surfaces real entries.
919        let source_state = accounts.get_or_create(source_account);
920        if let Some(repo) = source_state.repositories.get_mut(repo_name) {
921            repo.replication_statuses
922                .insert(digest.to_string(), statuses);
923        }
924    }
925
926    /// Mark `digest` as `IN_PROGRESS` and spawn the scanner task.
927    /// Identical wiring to `start_image_scan` minus the request-shaped
928    /// response — used both by the user-facing `StartImageScan` and the
929    /// `scan_on_push=true` PutImage hook.
930    fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
931        use crate::state::ImageScanFindings;
932        let layers = {
933            let mut accounts = self.state.write();
934            let Some(state) = accounts.get_mut(account) else {
935                return;
936            };
937            let Some(repo) = state.repositories.get_mut(name) else {
938                return;
939            };
940            repo.scan_findings.insert(
941                digest.to_string(),
942                ImageScanFindings {
943                    image_digest: digest.to_string(),
944                    scan_status: "IN_PROGRESS".to_string(),
945                    scan_completed_at: None,
946                    vulnerability_source_updated_at: None,
947                    finding_severity_counts: BTreeMap::new(),
948                    findings: Vec::new(),
949                },
950            );
951            layers_for_image(repo, digest)
952        };
953        let shared = self.state.clone();
954        let store = self.snapshot_store.clone();
955        let snap_lock = self.snapshot_lock.clone();
956        let account = account.to_string();
957        let name = name.to_string();
958        let digest = digest.to_string();
959        tokio::spawn(async move {
960            let result = crate::scanner::scan_layers(&digest, &layers).await;
961            {
962                let mut accounts = shared.write();
963                let Some(state) = accounts.get_mut(&account) else {
964                    return;
965                };
966                let Some(repo) = state.repositories.get_mut(&name) else {
967                    return;
968                };
969                let findings = result.unwrap_or_else(|| ImageScanFindings {
970                    image_digest: digest.clone(),
971                    scan_status: "COMPLETE".to_string(),
972                    scan_completed_at: Some(Utc::now()),
973                    vulnerability_source_updated_at: Some(Utc::now()),
974                    finding_severity_counts: BTreeMap::new(),
975                    findings: Vec::new(),
976                });
977                repo.scan_findings.insert(digest.clone(), findings);
978            }
979            EcrService::save_snapshot_with(shared, store, snap_lock).await;
980        });
981    }
982
983    fn batch_get_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
984        let body = request.json_body();
985        let name = req_str(&body, "repositoryName")?.to_string();
986        let ids = body
987            .get("imageIds")
988            .and_then(|v| v.as_array())
989            .cloned()
990            .unwrap_or_default();
991        let account = target_account_id(request, &body);
992        // Take a write lock so pull-time bookkeeping (`lastInUseAt`,
993        // `inUseCount`, `lastRecordedPullTime`) can update in place. Real
994        // ECR records pull time at least every 24 h on the data plane;
995        // fakecloud bumps unconditionally so tests can observe each call.
996        let mut accounts = self.state.write();
997        let state = accounts
998            .get_mut(&account)
999            .ok_or_else(|| repository_not_found(&name))?;
1000        // Snapshot the registered exclusion ARNs before grabbing the repo
1001        // mut so `touch_image_pull` can honour the exclusion contract
1002        // without holding two `&mut`s on `EcrState`.
1003        let exclusions = pull_time_exclusion_set(state);
1004        let repo = state
1005            .repositories
1006            .get_mut(&name)
1007            .ok_or_else(|| repository_not_found(&name))?;
1008
1009        check_repo_policy(
1010            &account,
1011            &request.account_id,
1012            &repo.repository_arn,
1013            &name,
1014            repo.policy.as_deref(),
1015            "ecr:BatchGetImage",
1016        )?;
1017
1018        let mut images: Vec<Value> = Vec::new();
1019        let mut failures: Vec<Value> = Vec::new();
1020        let mut hit_digests: Vec<String> = Vec::new();
1021        for id in &ids {
1022            match resolve_image_digest(repo, id) {
1023                Some(digest) => {
1024                    let img = repo.images.get(&digest).unwrap();
1025                    let tag = id.get("imageTag").and_then(|v| v.as_str());
1026                    images.push(json!({
1027                        "registryId": repo.registry_id,
1028                        "repositoryName": name,
1029                        "imageId": image_id_for(img, tag),
1030                        "imageManifest": img.image_manifest,
1031                        "imageManifestMediaType": img.image_manifest_media_type,
1032                    }));
1033                    hit_digests.push(digest);
1034                }
1035                None => failures.push(json!({
1036                    "imageId": id,
1037                    "failureCode": "ImageNotFound",
1038                    "failureReason": "Requested image not found",
1039                })),
1040            }
1041        }
1042        let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
1043        touch_image_pull(repo, &hit_digests, caller_arn, &exclusions);
1044        Ok(AwsResponse::ok_json(json!({
1045            "images": images,
1046            "failures": failures,
1047        })))
1048    }
1049
1050    fn batch_delete_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1051        let body = request.json_body();
1052        let name = req_str(&body, "repositoryName")?.to_string();
1053        let ids = body
1054            .get("imageIds")
1055            .and_then(|v| v.as_array())
1056            .cloned()
1057            .unwrap_or_default();
1058        let account = target_account_id(request, &body);
1059        let mut accounts = self.state.write();
1060        let state = accounts
1061            .get_mut(&account)
1062            .ok_or_else(|| repository_not_found(&name))?;
1063        let repo = state
1064            .repositories
1065            .get_mut(&name)
1066            .ok_or_else(|| repository_not_found(&name))?;
1067
1068        check_repo_policy(
1069            &account,
1070            &request.account_id,
1071            &repo.repository_arn,
1072            &name,
1073            repo.policy.as_deref(),
1074            "ecr:BatchDeleteImage",
1075        )?;
1076
1077        let mut deleted: Vec<Value> = Vec::new();
1078        let mut failures: Vec<Value> = Vec::new();
1079        for id in &ids {
1080            if let Some(tag) = id.get("imageTag").and_then(|v| v.as_str()) {
1081                // Delete by tag: remove only the tag, image stays if
1082                // other tags still reference the digest.
1083                if let Some(digest) = repo.image_tags.remove(tag) {
1084                    deleted.push(json!({ "imageDigest": digest, "imageTag": tag }));
1085                    let still_tagged = repo.image_tags.values().any(|d| *d == digest);
1086                    if !still_tagged {
1087                        repo.images.remove(&digest);
1088                    }
1089                    continue;
1090                }
1091                failures.push(json!({
1092                    "imageId": id,
1093                    "failureCode": "ImageNotFound",
1094                    "failureReason": "Requested image not found",
1095                }));
1096            } else if let Some(digest) = id.get("imageDigest").and_then(|v| v.as_str()) {
1097                if repo.images.remove(digest).is_some() {
1098                    repo.image_tags.retain(|_, d| d != digest);
1099                    deleted.push(json!({ "imageDigest": digest }));
1100                    continue;
1101                }
1102                failures.push(json!({
1103                    "imageId": id,
1104                    "failureCode": "ImageNotFound",
1105                    "failureReason": "Requested image not found",
1106                }));
1107            } else {
1108                failures.push(json!({
1109                    "imageId": id,
1110                    "failureCode": "InvalidImageTag",
1111                    "failureReason": "Either imageDigest or imageTag must be supplied",
1112                }));
1113            }
1114        }
1115        Ok(AwsResponse::ok_json(json!({
1116            "imageIds": deleted,
1117            "failures": failures,
1118        })))
1119    }
1120
1121    fn batch_check_layer_availability(
1122        &self,
1123        request: &AwsRequest,
1124    ) -> Result<AwsResponse, AwsServiceError> {
1125        let body = request.json_body();
1126        let name = req_str(&body, "repositoryName")?.to_string();
1127        let digests: Vec<String> = body
1128            .get("layerDigests")
1129            .and_then(|v| v.as_array())
1130            .map(|arr| {
1131                arr.iter()
1132                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1133                    .collect()
1134            })
1135            .unwrap_or_default();
1136        if digests.is_empty() {
1137            return Err(invalid_parameter(
1138                "At least one layerDigest must be supplied to BatchCheckLayerAvailability",
1139            ));
1140        }
1141        let account = target_account_id(request, &body);
1142        let accounts = self.state.read();
1143        let state = accounts
1144            .get(&account)
1145            .ok_or_else(|| repository_not_found(&name))?;
1146        let repo = state
1147            .repositories
1148            .get(&name)
1149            .ok_or_else(|| repository_not_found(&name))?;
1150        check_repo_policy(
1151            &account,
1152            &request.account_id,
1153            &repo.repository_arn,
1154            &name,
1155            repo.policy.as_deref(),
1156            "ecr:BatchCheckLayerAvailability",
1157        )?;
1158        let mut layers: Vec<Value> = Vec::new();
1159        let mut failures: Vec<Value> = Vec::new();
1160        for digest in &digests {
1161            match repo.layers.get(digest) {
1162                Some(layer) => layers.push(json!({
1163                    "layerDigest": layer.digest,
1164                    "layerAvailability": "AVAILABLE",
1165                    "layerSize": layer.size,
1166                    "mediaType": layer.media_type,
1167                })),
1168                None => failures.push(json!({
1169                    "layerDigest": digest,
1170                    "failureCode": "MissingLayerDigest",
1171                    "failureReason": "Layer not found in repository",
1172                })),
1173            }
1174        }
1175        Ok(AwsResponse::ok_json(json!({
1176            "layers": layers,
1177            "failures": failures,
1178        })))
1179    }
1180
1181    fn describe_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1182        const DEFAULT_PAGE_SIZE: usize = 100;
1183        let body = request.json_body();
1184        let name = req_str(&body, "repositoryName")?.to_string();
1185        let ids = body
1186            .get("imageIds")
1187            .and_then(|v| v.as_array())
1188            .cloned()
1189            .unwrap_or_default();
1190        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1191            Some(n) => {
1192                if !(1..=1000).contains(&n) {
1193                    return Err(invalid_parameter(format!(
1194                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1195                         Member must have value between 1 and 1000",
1196                    )));
1197                }
1198                n as usize
1199            }
1200            None => DEFAULT_PAGE_SIZE,
1201        };
1202        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1203            Some(raw) => raw.parse::<usize>().map_err(|_| {
1204                AwsServiceError::aws_error(
1205                    StatusCode::BAD_REQUEST,
1206                    "InvalidContinuationTokenException",
1207                    "The specified continuation token is not valid.",
1208                )
1209            })?,
1210            None => 0,
1211        };
1212        let account = target_account_id(request, &body);
1213        let accounts = self.state.read();
1214        let state = accounts
1215            .get(&account)
1216            .ok_or_else(|| repository_not_found(&name))?;
1217        let repo = state
1218            .repositories
1219            .get(&name)
1220            .ok_or_else(|| repository_not_found(&name))?;
1221
1222        let mut details: Vec<Value> = Vec::new();
1223        let mut next_token: Option<String> = None;
1224        if ids.is_empty() {
1225            let all: Vec<&Image> = repo.images.values().collect();
1226            let start = offset.min(all.len());
1227            let end = (start + max_results).min(all.len());
1228            for img in &all[start..end] {
1229                details.push(image_to_details(repo, img, &repo.registry_id));
1230            }
1231            if end < all.len() {
1232                next_token = Some(end.to_string());
1233            }
1234        } else {
1235            for id in &ids {
1236                let digest =
1237                    resolve_image_digest(repo, id).ok_or_else(|| image_not_found(&name, id))?;
1238                let img = repo.images.get(&digest).unwrap();
1239                details.push(image_to_details(repo, img, &repo.registry_id));
1240            }
1241        }
1242        let mut response = json!({ "imageDetails": details });
1243        if let Some(token) = next_token {
1244            response["nextToken"] = json!(token);
1245        }
1246        Ok(AwsResponse::ok_json(response))
1247    }
1248
1249    fn list_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1250        const DEFAULT_PAGE_SIZE: usize = 100;
1251        let body = request.json_body();
1252        let name = req_str(&body, "repositoryName")?.to_string();
1253        let filter_tag_status = body
1254            .get("filter")
1255            .and_then(|v| v.get("tagStatus"))
1256            .and_then(|v| v.as_str())
1257            .map(|s| s.to_string());
1258        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1259            Some(n) => {
1260                if !(1..=1000).contains(&n) {
1261                    return Err(invalid_parameter(format!(
1262                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1263                         Member must have value between 1 and 1000",
1264                    )));
1265                }
1266                n as usize
1267            }
1268            None => DEFAULT_PAGE_SIZE,
1269        };
1270        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1271            Some(raw) => raw.parse::<usize>().map_err(|_| {
1272                AwsServiceError::aws_error(
1273                    StatusCode::BAD_REQUEST,
1274                    "InvalidContinuationTokenException",
1275                    "The specified continuation token is not valid.",
1276                )
1277            })?,
1278            None => 0,
1279        };
1280        let account = target_account_id(request, &body);
1281        let accounts = self.state.read();
1282        let state = accounts
1283            .get(&account)
1284            .ok_or_else(|| repository_not_found(&name))?;
1285        let repo = state
1286            .repositories
1287            .get(&name)
1288            .ok_or_else(|| repository_not_found(&name))?;
1289
1290        // Enumerate once per (digest, tag-or-untagged) combination.
1291        let mut all: Vec<(String, Option<String>)> = Vec::new();
1292        for (tag, digest) in &repo.image_tags {
1293            all.push((digest.clone(), Some(tag.clone())));
1294        }
1295        let tagged_digests: std::collections::HashSet<&String> = repo.image_tags.values().collect();
1296        for digest in repo.images.keys() {
1297            if !tagged_digests.contains(digest) {
1298                all.push((digest.clone(), None));
1299            }
1300        }
1301        // Apply filter.
1302        all.retain(|(_, tag)| match filter_tag_status.as_deref() {
1303            Some("TAGGED") => tag.is_some(),
1304            Some("UNTAGGED") => tag.is_none(),
1305            _ => true,
1306        });
1307        all.sort();
1308
1309        let start = offset.min(all.len());
1310        let end = (start + max_results).min(all.len());
1311        let ids: Vec<Value> = all[start..end]
1312            .iter()
1313            .map(|(d, t)| {
1314                let mut v = json!({ "imageDigest": d });
1315                if let Some(tag) = t {
1316                    v["imageTag"] = json!(tag);
1317                }
1318                v
1319            })
1320            .collect();
1321        let mut response = json!({ "imageIds": ids });
1322        if end < all.len() {
1323            response["nextToken"] = json!(end.to_string());
1324        }
1325        Ok(AwsResponse::ok_json(response))
1326    }
1327
1328    fn get_download_url_for_layer(
1329        &self,
1330        request: &AwsRequest,
1331    ) -> Result<AwsResponse, AwsServiceError> {
1332        let body = request.json_body();
1333        let name = req_str(&body, "repositoryName")?.to_string();
1334        let digest = req_str(&body, "layerDigest")?.to_string();
1335        let account = target_account_id(request, &body);
1336        let mut accounts = self.state.write();
1337        let state = accounts
1338            .get_mut(&account)
1339            .ok_or_else(|| repository_not_found(&name))?;
1340        let exclusions = pull_time_exclusion_set(state);
1341        let repo = state
1342            .repositories
1343            .get_mut(&name)
1344            .ok_or_else(|| repository_not_found(&name))?;
1345        check_repo_policy(
1346            &account,
1347            &request.account_id,
1348            &repo.repository_arn,
1349            &name,
1350            repo.policy.as_deref(),
1351            "ecr:GetDownloadUrlForLayer",
1352        )?;
1353        if !repo.layers.contains_key(&digest) {
1354            return Err(layer_not_found(&digest, &name));
1355        }
1356        // Pull bookkeeping: the OCI client requested a layer blob, which
1357        // means at least one image whose manifest references that layer
1358        // is being pulled. Touch every such image so DescribeImages
1359        // reflects the access. Don't touch unrelated images.
1360        let mut touched: Vec<String> = Vec::new();
1361        for (img_digest, img) in &repo.images {
1362            let parsed: Value = match serde_json::from_str(&img.image_manifest) {
1363                Ok(v) => v,
1364                Err(_) => continue,
1365            };
1366            let references = parsed
1367                .get("layers")
1368                .and_then(|v| v.as_array())
1369                .map(|arr| {
1370                    arr.iter()
1371                        .any(|l| l.get("digest").and_then(|d| d.as_str()) == Some(digest.as_str()))
1372                })
1373                .unwrap_or(false);
1374            if references {
1375                touched.push(img_digest.clone());
1376            }
1377        }
1378        let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
1379        touch_image_pull(repo, &touched, caller_arn, &exclusions);
1380        // The OCI v2 endpoint hosts `/v2/<name>/blobs/<digest>` — return
1381        // that absolute URL so callers that trust the endpoint they're
1382        // already talking to can resolve it.
1383        let endpoint = accounts.endpoint();
1384        let url = format!(
1385            "{}/v2/{}/blobs/{}",
1386            endpoint.trim_end_matches('/'),
1387            name,
1388            digest
1389        );
1390        Ok(AwsResponse::ok_json(json!({
1391            "downloadUrl": url,
1392            "layerDigest": digest,
1393        })))
1394    }
1395
1396    fn initiate_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1397        let body = request.json_body();
1398        let name = req_str(&body, "repositoryName")?.to_string();
1399        let account = target_account_id(request, &body);
1400        let mut accounts = self.state.write();
1401        let state = accounts
1402            .get_mut(&account)
1403            .ok_or_else(|| repository_not_found(&name))?;
1404        let repo = state
1405            .repositories
1406            .get(&name)
1407            .ok_or_else(|| repository_not_found(&name))?;
1408        check_repo_policy(
1409            &account,
1410            &request.account_id,
1411            &repo.repository_arn,
1412            &name,
1413            repo.policy.as_deref(),
1414            "ecr:InitiateLayerUpload",
1415        )?;
1416        let upload_id = Uuid::new_v4().to_string();
1417        let spool = crate::oci::create_upload_spool(&upload_id).map_err(|e| {
1418            AwsServiceError::aws_error(
1419                StatusCode::INTERNAL_SERVER_ERROR,
1420                "InternalError",
1421                format!("failed to create upload spool: {e}"),
1422            )
1423        })?;
1424        state.layer_uploads.insert(
1425            upload_id.clone(),
1426            LayerUpload {
1427                upload_id: upload_id.clone(),
1428                repository_name: name,
1429                created_at: Utc::now(),
1430                spool_path: spool.to_string_lossy().to_string(),
1431                last_byte_received: 0,
1432            },
1433        );
1434        Ok(AwsResponse::ok_json(json!({
1435            "uploadId": upload_id,
1436            // Matches the real AWS default of 10 MiB.
1437            "partSize": 10_485_760u64,
1438        })))
1439    }
1440
1441    fn upload_layer_part(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1442        let body = request.json_body();
1443        let name = req_str(&body, "repositoryName")?.to_string();
1444        let upload_id = req_str(&body, "uploadId")?.to_string();
1445        let first_byte = body
1446            .get("partFirstByte")
1447            .and_then(|v| v.as_u64())
1448            .ok_or_else(|| invalid_parameter("Missing partFirstByte"))?;
1449        let last_byte = body
1450            .get("partLastByte")
1451            .and_then(|v| v.as_u64())
1452            .ok_or_else(|| invalid_parameter("Missing partLastByte"))?;
1453        let part_blob_b64 = req_str(&body, "layerPartBlob")?.to_string();
1454        let part_bytes = B64
1455            .decode(part_blob_b64.as_bytes())
1456            .map_err(|_| invalid_layer("layerPartBlob is not valid base64"))?;
1457        let account = target_account_id(request, &body);
1458        let mut accounts = self.state.write();
1459        let state = accounts
1460            .get_mut(&account)
1461            .ok_or_else(|| repository_not_found(&name))?;
1462        let repo = state
1463            .repositories
1464            .get(&name)
1465            .ok_or_else(|| repository_not_found(&name))?;
1466        check_repo_policy(
1467            &account,
1468            &request.account_id,
1469            &repo.repository_arn,
1470            &name,
1471            repo.policy.as_deref(),
1472            "ecr:UploadLayerPart",
1473        )?;
1474        let upload = state
1475            .layer_uploads
1476            .get_mut(&upload_id)
1477            .ok_or_else(|| upload_not_found(&upload_id))?;
1478        if upload.repository_name != name {
1479            return Err(upload_not_found(&upload_id));
1480        }
1481        if first_byte != upload.last_byte_received {
1482            return Err(invalid_layer(format!(
1483                "Layer part upload out of order: expected partFirstByte {} got {}",
1484                upload.last_byte_received, first_byte,
1485            )));
1486        }
1487        let expected_len = last_byte
1488            .checked_sub(first_byte)
1489            .and_then(|d| d.checked_add(1))
1490            .ok_or_else(|| invalid_layer("partLastByte < partFirstByte"))?;
1491        if part_bytes.len() as u64 != expected_len {
1492            return Err(invalid_layer(format!(
1493                "Layer part size mismatch: bytes {} doesn't match range [{first_byte}, {last_byte}]",
1494                part_bytes.len()
1495            )));
1496        }
1497        let spool = std::path::PathBuf::from(&upload.spool_path);
1498        crate::oci::append_bytes_sync(&spool, &part_bytes).map_err(|e| {
1499            AwsServiceError::aws_error(
1500                StatusCode::INTERNAL_SERVER_ERROR,
1501                "InternalError",
1502                format!("failed to append upload chunk: {e}"),
1503            )
1504        })?;
1505        upload.last_byte_received = last_byte + 1;
1506        Ok(AwsResponse::ok_json(json!({
1507            "registryId": state.registry_id(),
1508            "repositoryName": name,
1509            "uploadId": upload_id,
1510            "lastByteReceived": last_byte,
1511        })))
1512    }
1513
1514    fn get_authorization_token(
1515        &self,
1516        request: &AwsRequest,
1517    ) -> Result<AwsResponse, AwsServiceError> {
1518        let body = request.json_body();
1519        let registry_ids: Vec<String> = body
1520            .get("registryIds")
1521            .and_then(|v| v.as_array())
1522            .map(|arr| {
1523                arr.iter()
1524                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1525                    .collect()
1526            })
1527            .unwrap_or_default();
1528        let accounts = self.state.read();
1529        let default_account = accounts.default_account_id().to_string();
1530        let targets = if registry_ids.is_empty() {
1531            vec![default_account]
1532        } else {
1533            registry_ids
1534        };
1535        let endpoint = accounts.endpoint().to_string();
1536        drop(accounts);
1537        let expires_at = (Utc::now() + chrono::Duration::hours(12)).timestamp();
1538        let authorization_data: Vec<Value> = targets
1539            .into_iter()
1540            .map(|_registry_id| {
1541                let token = B64.encode(format!("AWS:{}", Uuid::new_v4()).as_bytes());
1542                json!({
1543                    "authorizationToken": token,
1544                    "expiresAt": expires_at,
1545                    "proxyEndpoint": endpoint,
1546                })
1547            })
1548            .collect();
1549        Ok(AwsResponse::ok_json(json!({
1550            "authorizationData": authorization_data,
1551        })))
1552    }
1553
1554    fn complete_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1555        let body = request.json_body();
1556        let name = req_str(&body, "repositoryName")?.to_string();
1557        let upload_id = req_str(&body, "uploadId")?.to_string();
1558        let digests: Vec<String> = body
1559            .get("layerDigests")
1560            .and_then(|v| v.as_array())
1561            .map(|arr| {
1562                arr.iter()
1563                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564                    .collect()
1565            })
1566            .unwrap_or_default();
1567        if digests.is_empty() {
1568            return Err(invalid_parameter(
1569                "At least one layerDigest must be supplied to CompleteLayerUpload",
1570            ));
1571        }
1572        let account = target_account_id(request, &body);
1573        let mut accounts = self.state.write();
1574        let state = accounts
1575            .get_mut(&account)
1576            .ok_or_else(|| repository_not_found(&name))?;
1577        let repo = state
1578            .repositories
1579            .get(&name)
1580            .ok_or_else(|| repository_not_found(&name))?;
1581        check_repo_policy(
1582            &account,
1583            &request.account_id,
1584            &repo.repository_arn,
1585            &name,
1586            repo.policy.as_deref(),
1587            "ecr:CompleteLayerUpload",
1588        )?;
1589        // Peek, validate, then commit — so a digest mismatch lets the
1590        // caller retry CompleteLayerUpload with the correct digest
1591        // instead of having to re-upload the entire blob.
1592        let upload = state
1593            .layer_uploads
1594            .get(&upload_id)
1595            .ok_or_else(|| upload_not_found(&upload_id))?;
1596        if upload.repository_name != name {
1597            return Err(upload_not_found(&upload_id));
1598        }
1599        let spool = std::path::PathBuf::from(&upload.spool_path);
1600        let blob_bytes = crate::oci::read_spool(&spool).map_err(|e| {
1601            AwsServiceError::aws_error(
1602                StatusCode::INTERNAL_SERVER_ERROR,
1603                "InternalError",
1604                format!("failed to read upload spool: {e}"),
1605            )
1606        })?;
1607        let computed = sha256_digest(&blob_bytes);
1608        if !digests.iter().any(|d| d == &computed) {
1609            // Spool stays — caller can retry with the correct digest
1610            // without re-uploading every UploadLayerPart chunk.
1611            return Err(AwsServiceError::aws_error(
1612                StatusCode::BAD_REQUEST,
1613                "LayerDigestMismatchException",
1614                format!(
1615                    "The layer digest from the client ({}) does not match the digest of the received bytes ({computed})",
1616                    digests.join(",")
1617                ),
1618            ));
1619        }
1620        let _upload = state.layer_uploads.remove(&upload_id).unwrap();
1621        crate::oci::unlink_spool(&spool);
1622        let size = blob_bytes.len() as u64;
1623        // Drop the write guard before the KMS encrypt call (which takes
1624        // its own lock). Re-acquire to insert.
1625        drop(accounts);
1626        let (stored_bytes, encrypted_with) =
1627            crate::oci::encrypt_layer_bytes(self, &account, &name, &blob_bytes);
1628        let mut accounts = self.state.write();
1629        let state = accounts
1630            .get_mut(&account)
1631            .ok_or_else(|| repository_not_found(&name))?;
1632        let repo = state
1633            .repositories
1634            .get_mut(&name)
1635            .ok_or_else(|| repository_not_found(&name))?;
1636        repo.layers.insert(
1637            computed.clone(),
1638            Layer {
1639                digest: computed.clone(),
1640                size,
1641                blob_b64: B64.encode(&stored_bytes),
1642                media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
1643                encrypted_with_kms_key: encrypted_with,
1644            },
1645        );
1646        let registry_id = repo.registry_id.clone();
1647        Ok(AwsResponse::ok_json(json!({
1648            "registryId": registry_id,
1649            "repositoryName": name,
1650            "uploadId": upload_id,
1651            "layerDigest": computed,
1652        })))
1653    }
1654}
1655
1656// -------- lifecycle + scan + registry + polish handlers (Batch 4) --------
1657
1658impl EcrService {
1659    fn put_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1660        let body = request.json_body();
1661        let name = req_str(&body, "repositoryName")?.to_string();
1662        let policy = req_str(&body, "lifecyclePolicyText")?.to_string();
1663        // Parse sanity-check.
1664        serde_json::from_str::<Value>(&policy)
1665            .map_err(|_| invalid_parameter("lifecyclePolicyText is not valid JSON"))?;
1666        let account = target_account_id(request, &body);
1667        let mut accounts = self.state.write();
1668        let state = accounts
1669            .get_mut(&account)
1670            .ok_or_else(|| repository_not_found(&name))?;
1671        let repo = state
1672            .repositories
1673            .get_mut(&name)
1674            .ok_or_else(|| repository_not_found(&name))?;
1675        repo.lifecycle_policy = Some(policy.clone());
1676        // Apply immediately so the store reflects the policy.
1677        let prune = evaluate_lifecycle_policy(repo, &policy);
1678        for digest in &prune {
1679            repo.images.remove(digest);
1680            repo.image_tags.retain(|_, d| d != digest);
1681        }
1682        repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
1683        let registry_id = repo.registry_id.clone();
1684        Ok(AwsResponse::ok_json(json!({
1685            "registryId": registry_id,
1686            "repositoryName": name,
1687            "lifecyclePolicyText": policy,
1688        })))
1689    }
1690
1691    fn get_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1692        let body = request.json_body();
1693        let name = req_str(&body, "repositoryName")?.to_string();
1694        let account = target_account_id(request, &body);
1695        let accounts = self.state.read();
1696        let state = accounts
1697            .get(&account)
1698            .ok_or_else(|| repository_not_found(&name))?;
1699        let repo = state
1700            .repositories
1701            .get(&name)
1702            .ok_or_else(|| repository_not_found(&name))?;
1703        let policy = repo
1704            .lifecycle_policy
1705            .clone()
1706            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1707        let last_eval = repo
1708            .lifecycle_policy_last_evaluated_at
1709            .map(|t| t.timestamp())
1710            .unwrap_or(0);
1711        Ok(AwsResponse::ok_json(json!({
1712            "registryId": repo.registry_id,
1713            "repositoryName": name,
1714            "lifecyclePolicyText": policy,
1715            "lastEvaluatedAt": last_eval,
1716        })))
1717    }
1718
1719    fn delete_lifecycle_policy(
1720        &self,
1721        request: &AwsRequest,
1722    ) -> Result<AwsResponse, AwsServiceError> {
1723        let body = request.json_body();
1724        let name = req_str(&body, "repositoryName")?.to_string();
1725        let account = target_account_id(request, &body);
1726        let mut accounts = self.state.write();
1727        let state = accounts
1728            .get_mut(&account)
1729            .ok_or_else(|| repository_not_found(&name))?;
1730        let repo = state
1731            .repositories
1732            .get_mut(&name)
1733            .ok_or_else(|| repository_not_found(&name))?;
1734        let policy = repo
1735            .lifecycle_policy
1736            .take()
1737            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1738        let last_eval = repo
1739            .lifecycle_policy_last_evaluated_at
1740            .take()
1741            .map(|t| t.timestamp())
1742            .unwrap_or(0);
1743        let registry_id = repo.registry_id.clone();
1744        Ok(AwsResponse::ok_json(json!({
1745            "registryId": registry_id,
1746            "repositoryName": name,
1747            "lifecyclePolicyText": policy,
1748            "lastEvaluatedAt": last_eval,
1749        })))
1750    }
1751
1752    fn start_lifecycle_policy_preview(
1753        &self,
1754        request: &AwsRequest,
1755    ) -> Result<AwsResponse, AwsServiceError> {
1756        let body = request.json_body();
1757        let name = req_str(&body, "repositoryName")?.to_string();
1758        let account = target_account_id(request, &body);
1759        let policy = match opt_str(&body, "lifecyclePolicyText") {
1760            Some(s) => s.to_string(),
1761            None => {
1762                let accounts = self.state.read();
1763                let state = accounts
1764                    .get(&account)
1765                    .ok_or_else(|| repository_not_found(&name))?;
1766                let repo = state
1767                    .repositories
1768                    .get(&name)
1769                    .ok_or_else(|| repository_not_found(&name))?;
1770                repo.lifecycle_policy
1771                    .clone()
1772                    .ok_or_else(|| lifecycle_policy_not_found(&name))?
1773            }
1774        };
1775        // Run the preview eval and stamp `last_evaluated_at` so callers
1776        // polling `GetLifecyclePolicy` after a preview see the most
1777        // recent evaluation marker. Preview is non-destructive — we
1778        // don't apply the prune here, just record that it ran.
1779        let mut accounts = self.state.write();
1780        let state = accounts
1781            .get_mut(&account)
1782            .ok_or_else(|| repository_not_found(&name))?;
1783        let repo = state
1784            .repositories
1785            .get_mut(&name)
1786            .ok_or_else(|| repository_not_found(&name))?;
1787        let _prune = evaluate_lifecycle_policy(repo, &policy);
1788        repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
1789        let registry_id = repo.registry_id.clone();
1790        Ok(AwsResponse::ok_json(json!({
1791            "registryId": registry_id,
1792            "repositoryName": name,
1793            "lifecyclePolicyText": policy,
1794            "status": "COMPLETE",
1795        })))
1796    }
1797
1798    fn get_lifecycle_policy_preview(
1799        &self,
1800        request: &AwsRequest,
1801    ) -> Result<AwsResponse, AwsServiceError> {
1802        let body = request.json_body();
1803        let name = req_str(&body, "repositoryName")?.to_string();
1804        let account = target_account_id(request, &body);
1805        let accounts = self.state.read();
1806        let state = accounts
1807            .get(&account)
1808            .ok_or_else(|| repository_not_found(&name))?;
1809        let repo = state
1810            .repositories
1811            .get(&name)
1812            .ok_or_else(|| repository_not_found(&name))?;
1813        let policy = repo
1814            .lifecycle_policy
1815            .clone()
1816            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1817        let prune = evaluate_lifecycle_policy(repo, &policy);
1818        let results: Vec<Value> = prune
1819            .iter()
1820            .map(|digest| {
1821                json!({
1822                    "imageDigest": digest,
1823                    "imagePushedAt": repo.images.get(digest).map(|i| i.image_pushed_at.timestamp()).unwrap_or(0),
1824                    "action": {"type": "EXPIRE"},
1825                })
1826            })
1827            .collect();
1828        Ok(AwsResponse::ok_json(json!({
1829            "registryId": repo.registry_id,
1830            "repositoryName": name,
1831            "lifecyclePolicyText": policy,
1832            "status": "COMPLETE",
1833            "previewResults": results,
1834            "summary": {"expiringImageTotalCount": prune.len()},
1835        })))
1836    }
1837
1838    fn start_image_scan(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1839        use crate::state::ImageScanFindings;
1840        let body = request.json_body();
1841        let name = req_str(&body, "repositoryName")?.to_string();
1842        let image_id = body
1843            .get("imageId")
1844            .cloned()
1845            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
1846        let account = target_account_id(request, &body);
1847        let (digest, layers, registry_id) = {
1848            let mut accounts = self.state.write();
1849            let state = accounts
1850                .get_mut(&account)
1851                .ok_or_else(|| repository_not_found(&name))?;
1852            let repo = state
1853                .repositories
1854                .get_mut(&name)
1855                .ok_or_else(|| repository_not_found(&name))?;
1856            let digest = resolve_image_digest(repo, &image_id)
1857                .ok_or_else(|| image_not_found(&name, &image_id))?;
1858            // Mark scan IN_PROGRESS; real findings written by the
1859            // background scanner task. Caller polls
1860            // DescribeImageScanFindings to observe completion.
1861            repo.scan_findings.insert(
1862                digest.clone(),
1863                ImageScanFindings {
1864                    image_digest: digest.clone(),
1865                    scan_status: "IN_PROGRESS".to_string(),
1866                    scan_completed_at: None,
1867                    vulnerability_source_updated_at: None,
1868                    finding_severity_counts: BTreeMap::new(),
1869                    findings: Vec::new(),
1870                },
1871            );
1872            // Scope the scan to layers actually referenced by *this*
1873            // image's manifest. Other images sitting in the same repo
1874            // (different tag, different digest) must not contaminate
1875            // the findings — Trivy would otherwise report CVEs from
1876            // unrelated images.
1877            let layers = layers_for_image(repo, &digest);
1878            (digest, layers, repo.registry_id.clone())
1879        };
1880
1881        let shared = self.state.clone();
1882        let store = self.snapshot_store.clone();
1883        let snap_lock = self.snapshot_lock.clone();
1884        let account_for_task = account.clone();
1885        let name_for_task = name.clone();
1886        let digest_for_task = digest.clone();
1887        tokio::spawn(async move {
1888            let result = crate::scanner::scan_layers(&digest_for_task, &layers).await;
1889            {
1890                let mut accounts = shared.write();
1891                let Some(state) = accounts.get_mut(&account_for_task) else {
1892                    return;
1893                };
1894                let Some(repo) = state.repositories.get_mut(&name_for_task) else {
1895                    return;
1896                };
1897                let findings = result.unwrap_or_else(|| ImageScanFindings {
1898                    image_digest: digest_for_task.clone(),
1899                    scan_status: "COMPLETE".to_string(),
1900                    scan_completed_at: Some(Utc::now()),
1901                    vulnerability_source_updated_at: Some(Utc::now()),
1902                    finding_severity_counts: BTreeMap::new(),
1903                    findings: Vec::new(),
1904                });
1905                repo.scan_findings.insert(digest_for_task.clone(), findings);
1906            }
1907            // Persist the COMPLETE state — without this, restarts
1908            // restore the snapshot taken at IN_PROGRESS time and the
1909            // scan appears stuck forever.
1910            EcrService::save_snapshot_with(shared, store, snap_lock).await;
1911        });
1912
1913        Ok(AwsResponse::ok_json(json!({
1914            "registryId": registry_id,
1915            "repositoryName": name,
1916            "imageId": image_id,
1917            "imageScanStatus": {"status": "IN_PROGRESS"},
1918        })))
1919    }
1920
1921    fn describe_image_scan_findings(
1922        &self,
1923        request: &AwsRequest,
1924    ) -> Result<AwsResponse, AwsServiceError> {
1925        let body = request.json_body();
1926        let name = req_str(&body, "repositoryName")?.to_string();
1927        let image_id = body
1928            .get("imageId")
1929            .cloned()
1930            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
1931        let account = target_account_id(request, &body);
1932        let accounts = self.state.read();
1933        let state = accounts
1934            .get(&account)
1935            .ok_or_else(|| repository_not_found(&name))?;
1936        let repo = state
1937            .repositories
1938            .get(&name)
1939            .ok_or_else(|| repository_not_found(&name))?;
1940        let digest = resolve_image_digest(repo, &image_id)
1941            .ok_or_else(|| image_not_found(&name, &image_id))?;
1942        let findings = repo.scan_findings.get(&digest).cloned().unwrap_or_else(|| {
1943            crate::state::ImageScanFindings {
1944                image_digest: digest.clone(),
1945                scan_status: "COMPLETE".to_string(),
1946                scan_completed_at: Some(Utc::now()),
1947                vulnerability_source_updated_at: Some(Utc::now()),
1948                finding_severity_counts: BTreeMap::new(),
1949                findings: Vec::new(),
1950            }
1951        });
1952        Ok(AwsResponse::ok_json(json!({
1953            "registryId": repo.registry_id,
1954            "repositoryName": name,
1955            "imageId": image_id,
1956            "imageScanStatus": {"status": findings.scan_status},
1957            "imageScanFindings": {
1958                "imageScanCompletedAt": findings.scan_completed_at.map(|t| t.timestamp()),
1959                "vulnerabilitySourceUpdatedAt": findings.vulnerability_source_updated_at.map(|t| t.timestamp()),
1960                "findings": findings.findings,
1961                "findingSeverityCounts": findings.finding_severity_counts,
1962            },
1963        })))
1964    }
1965
1966    fn describe_registry(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1967        let body = request.json_body();
1968        let account = target_account_id(request, &body);
1969        let accounts = self.state.read();
1970        let state = accounts.get(&account);
1971        let registry_id = state
1972            .map(|s| s.account_id.clone())
1973            .unwrap_or_else(|| account.clone());
1974        let rules = state
1975            .and_then(|s| s.replication_configuration.as_ref())
1976            .map(|cfg| {
1977                cfg.rules
1978                    .iter()
1979                    .map(|r| {
1980                        json!({
1981                            "destinations": r.destinations.iter().map(|d| json!({
1982                                "region": d.region,
1983                                "registryId": d.registry_id,
1984                            })).collect::<Vec<_>>(),
1985                            "repositoryFilters": r.repository_filters.iter().map(|f| json!({
1986                                "filter": f.filter,
1987                                "filterType": f.filter_type,
1988                            })).collect::<Vec<_>>(),
1989                        })
1990                    })
1991                    .collect::<Vec<_>>()
1992            })
1993            .unwrap_or_default();
1994        Ok(AwsResponse::ok_json(json!({
1995            "registryId": registry_id,
1996            "replicationConfiguration": {"rules": rules},
1997        })))
1998    }
1999
2000    fn get_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2001        let body = request.json_body();
2002        let account = target_account_id(request, &body);
2003        let accounts = self.state.read();
2004        let state = accounts
2005            .get(&account)
2006            .ok_or_else(registry_policy_not_found)?;
2007        let policy = state
2008            .registry_policy
2009            .clone()
2010            .ok_or_else(registry_policy_not_found)?;
2011        Ok(AwsResponse::ok_json(json!({
2012            "registryId": state.account_id,
2013            "policyText": policy,
2014        })))
2015    }
2016
2017    fn put_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2018        let body = request.json_body();
2019        let policy = req_str(&body, "policyText")?.to_string();
2020        if policy.len() > 10_240 {
2021            return Err(invalid_parameter(format!(
2022                "Value at 'policyText' failed to satisfy constraint: \
2023                 Member must have length less than or equal to 10240 (got {})",
2024                policy.len()
2025            )));
2026        }
2027        let account = target_account_id(request, &body);
2028        let mut accounts = self.state.write();
2029        let state = accounts.get_or_create(&account);
2030        state.registry_policy = Some(policy.clone());
2031        Ok(AwsResponse::ok_json(json!({
2032            "registryId": state.account_id,
2033            "policyText": policy,
2034        })))
2035    }
2036
2037    fn delete_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2038        let body = request.json_body();
2039        let account = target_account_id(request, &body);
2040        let mut accounts = self.state.write();
2041        let state = accounts
2042            .get_mut(&account)
2043            .ok_or_else(registry_policy_not_found)?;
2044        let policy = state
2045            .registry_policy
2046            .take()
2047            .ok_or_else(registry_policy_not_found)?;
2048        Ok(AwsResponse::ok_json(json!({
2049            "registryId": state.account_id,
2050            "policyText": policy,
2051        })))
2052    }
2053
2054    fn get_registry_scanning_configuration(
2055        &self,
2056        request: &AwsRequest,
2057    ) -> Result<AwsResponse, AwsServiceError> {
2058        let body = request.json_body();
2059        let account = target_account_id(request, &body);
2060        let accounts = self.state.read();
2061        let state = accounts.get(&account);
2062        let cfg = state
2063            .map(|s| s.registry_scanning_configuration.clone())
2064            .unwrap_or_default();
2065        let rules: Vec<Value> = cfg
2066            .rules
2067            .iter()
2068            .map(|r| {
2069                json!({
2070                    "scanFrequency": r.scan_frequency,
2071                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2072                        "filter": f.filter,
2073                        "filterType": f.filter_type,
2074                    })).collect::<Vec<_>>(),
2075                })
2076            })
2077            .collect();
2078        Ok(AwsResponse::ok_json(json!({
2079            "registryId": state.map(|s| s.account_id.clone()).unwrap_or(account),
2080            "scanningConfiguration": {
2081                "scanType": cfg.scan_type,
2082                "rules": rules,
2083            },
2084        })))
2085    }
2086
2087    fn put_registry_scanning_configuration(
2088        &self,
2089        request: &AwsRequest,
2090    ) -> Result<AwsResponse, AwsServiceError> {
2091        use crate::state::{RegistryScanningConfiguration, RegistryScanningRule, RepositoryFilter};
2092        let body = request.json_body();
2093        let scan_type = opt_str(&body, "scanType").unwrap_or("BASIC").to_string();
2094        if scan_type != "BASIC" && scan_type != "ENHANCED" {
2095            return Err(invalid_parameter(format!(
2096                "Invalid scanType '{scan_type}'. Must be BASIC or ENHANCED."
2097            )));
2098        }
2099        let rules = body
2100            .get("rules")
2101            .and_then(|v| v.as_array())
2102            .cloned()
2103            .unwrap_or_default();
2104        let parsed_rules: Vec<RegistryScanningRule> = rules
2105            .iter()
2106            .map(|r| RegistryScanningRule {
2107                scan_frequency: r
2108                    .get("scanFrequency")
2109                    .and_then(|v| v.as_str())
2110                    .unwrap_or("SCAN_ON_PUSH")
2111                    .to_string(),
2112                repository_filters: r
2113                    .get("repositoryFilters")
2114                    .and_then(|v| v.as_array())
2115                    .map(|arr| {
2116                        arr.iter()
2117                            .map(|f| RepositoryFilter {
2118                                filter: f
2119                                    .get("filter")
2120                                    .and_then(|v| v.as_str())
2121                                    .unwrap_or("")
2122                                    .to_string(),
2123                                filter_type: f
2124                                    .get("filterType")
2125                                    .and_then(|v| v.as_str())
2126                                    .unwrap_or("WILDCARD")
2127                                    .to_string(),
2128                            })
2129                            .collect()
2130                    })
2131                    .unwrap_or_default(),
2132            })
2133            .collect();
2134        let account = target_account_id(request, &body);
2135        let mut accounts = self.state.write();
2136        let state = accounts.get_or_create(&account);
2137        state.registry_scanning_configuration = RegistryScanningConfiguration {
2138            scan_type: scan_type.clone(),
2139            rules: parsed_rules,
2140        };
2141        let cfg = state.registry_scanning_configuration.clone();
2142        Ok(AwsResponse::ok_json(json!({
2143            "registryScanningConfiguration": {
2144                "scanType": cfg.scan_type,
2145                "rules": cfg.rules.iter().map(|r| json!({
2146                    "scanFrequency": r.scan_frequency,
2147                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2148                        "filter": f.filter,
2149                        "filterType": f.filter_type,
2150                    })).collect::<Vec<_>>(),
2151                })).collect::<Vec<_>>(),
2152            },
2153        })))
2154    }
2155
2156    fn batch_get_repository_scanning_configuration(
2157        &self,
2158        request: &AwsRequest,
2159    ) -> Result<AwsResponse, AwsServiceError> {
2160        let body = request.json_body();
2161        let names: Vec<String> = body
2162            .get("repositoryNames")
2163            .and_then(|v| v.as_array())
2164            .ok_or_else(|| invalid_parameter("Missing required field: repositoryNames"))?
2165            .iter()
2166            .filter_map(|v| v.as_str().map(|s| s.to_string()))
2167            .collect();
2168        let account = target_account_id(request, &body);
2169        let accounts = self.state.read();
2170        let state = accounts
2171            .get(&account)
2172            .ok_or_else(|| repository_not_found(&account))?;
2173        let mut scanning: Vec<Value> = Vec::new();
2174        let mut failures: Vec<Value> = Vec::new();
2175        for n in &names {
2176            match state.repositories.get(n) {
2177                Some(repo) => scanning.push(json!({
2178                    "repositoryArn": repo.repository_arn,
2179                    "repositoryName": n,
2180                    "scanOnPush": repo.image_scanning_configuration.scan_on_push,
2181                    "scanFrequency": "SCAN_ON_PUSH",
2182                    "appliedScanFilters": [],
2183                })),
2184                None => failures.push(json!({
2185                    "repositoryName": n,
2186                    "failureCode": "REPOSITORY_NOT_FOUND",
2187                    "failureReason": format!("Repository '{n}' not found"),
2188                })),
2189            }
2190        }
2191        Ok(AwsResponse::ok_json(json!({
2192            "scanningConfigurations": scanning,
2193            "failures": failures,
2194        })))
2195    }
2196
2197    fn put_replication_configuration(
2198        &self,
2199        request: &AwsRequest,
2200    ) -> Result<AwsResponse, AwsServiceError> {
2201        use crate::state::{
2202            ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
2203        };
2204        let body = request.json_body();
2205        let cfg_value = body
2206            .get("replicationConfiguration")
2207            .cloned()
2208            .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
2209        let rules_value = cfg_value
2210            .get("rules")
2211            .and_then(|v| v.as_array())
2212            .cloned()
2213            .unwrap_or_default();
2214        let rules: Vec<ReplicationRule> = rules_value
2215            .iter()
2216            .map(|r| ReplicationRule {
2217                destinations: r
2218                    .get("destinations")
2219                    .and_then(|v| v.as_array())
2220                    .map(|arr| {
2221                        arr.iter()
2222                            .map(|d| ReplicationDestination {
2223                                region: d
2224                                    .get("region")
2225                                    .and_then(|v| v.as_str())
2226                                    .unwrap_or("")
2227                                    .to_string(),
2228                                registry_id: d
2229                                    .get("registryId")
2230                                    .and_then(|v| v.as_str())
2231                                    .unwrap_or("")
2232                                    .to_string(),
2233                            })
2234                            .collect()
2235                    })
2236                    .unwrap_or_default(),
2237                repository_filters: r
2238                    .get("repositoryFilters")
2239                    .and_then(|v| v.as_array())
2240                    .map(|arr| {
2241                        arr.iter()
2242                            .map(|f| RepositoryFilter {
2243                                filter: f
2244                                    .get("filter")
2245                                    .and_then(|v| v.as_str())
2246                                    .unwrap_or("")
2247                                    .to_string(),
2248                                filter_type: f
2249                                    .get("filterType")
2250                                    .and_then(|v| v.as_str())
2251                                    .unwrap_or("PREFIX_MATCH")
2252                                    .to_string(),
2253                            })
2254                            .collect()
2255                    })
2256                    .unwrap_or_default(),
2257            })
2258            .collect();
2259        let account = target_account_id(request, &body);
2260        let mut accounts = self.state.write();
2261        let state = accounts.get_or_create(&account);
2262        state.replication_configuration = Some(ReplicationConfiguration { rules });
2263        let cfg = state.replication_configuration.clone().unwrap();
2264        Ok(AwsResponse::ok_json(json!({
2265            "replicationConfiguration": {
2266                "rules": cfg.rules.iter().map(|r| json!({
2267                    "destinations": r.destinations.iter().map(|d| json!({
2268                        "region": d.region,
2269                        "registryId": d.registry_id,
2270                    })).collect::<Vec<_>>(),
2271                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2272                        "filter": f.filter,
2273                        "filterType": f.filter_type,
2274                    })).collect::<Vec<_>>(),
2275                })).collect::<Vec<_>>(),
2276            },
2277        })))
2278    }
2279
2280    fn describe_image_replication_status(
2281        &self,
2282        request: &AwsRequest,
2283    ) -> Result<AwsResponse, AwsServiceError> {
2284        let body = request.json_body();
2285        let name = req_str(&body, "repositoryName")?.to_string();
2286        let image_id = body
2287            .get("imageId")
2288            .cloned()
2289            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2290        let account = target_account_id(request, &body);
2291        let accounts = self.state.read();
2292        let state = accounts
2293            .get(&account)
2294            .ok_or_else(|| repository_not_found(&name))?;
2295        let repo = state
2296            .repositories
2297            .get(&name)
2298            .ok_or_else(|| repository_not_found(&name))?;
2299        let Some(digest) = resolve_image_digest(repo, &image_id) else {
2300            return Err(image_not_found(&name, &image_id));
2301        };
2302        let statuses: Vec<Value> = repo
2303            .replication_statuses
2304            .get(&digest)
2305            .map(|entries| {
2306                entries
2307                    .iter()
2308                    .map(|s| {
2309                        let mut obj = serde_json::Map::new();
2310                        obj.insert("region".into(), Value::String(s.region.clone()));
2311                        obj.insert("registryId".into(), Value::String(s.registry_id.clone()));
2312                        obj.insert("status".into(), Value::String(s.status.clone()));
2313                        if let Some(ref code) = s.failure_code {
2314                            obj.insert("failureCode".into(), Value::String(code.clone()));
2315                        }
2316                        if let Some(ref reason) = s.failure_reason {
2317                            obj.insert("failureReason".into(), Value::String(reason.clone()));
2318                        }
2319                        Value::Object(obj)
2320                    })
2321                    .collect()
2322            })
2323            .unwrap_or_default();
2324        Ok(AwsResponse::ok_json(json!({
2325            "repositoryName": name,
2326            "imageId": image_id,
2327            "replicationStatuses": statuses,
2328        })))
2329    }
2330
2331    fn create_pull_through_cache_rule(
2332        &self,
2333        request: &AwsRequest,
2334    ) -> Result<AwsResponse, AwsServiceError> {
2335        use crate::state::PullThroughCacheRule;
2336        let body = request.json_body();
2337        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2338        validate_pullthrough_prefix(&prefix)?;
2339        let upstream_url = req_str(&body, "upstreamRegistryUrl")?.to_string();
2340        let account = target_account_id(request, &body);
2341        let mut accounts = self.state.write();
2342        let state = accounts.get_or_create(&account);
2343        if state.pull_through_cache_rules.contains_key(&prefix) {
2344            return Err(AwsServiceError::aws_error(
2345                StatusCode::BAD_REQUEST,
2346                "PullThroughCacheRuleAlreadyExistsException",
2347                format!("A pull through cache rule with the prefix '{prefix}' already exists."),
2348            ));
2349        }
2350        let now = Utc::now();
2351        let rule = PullThroughCacheRule {
2352            ecr_repository_prefix: prefix.clone(),
2353            upstream_registry_url: upstream_url.clone(),
2354            upstream_registry: opt_str(&body, "upstreamRegistry").map(|s| s.to_string()),
2355            credential_arn: opt_str(&body, "credentialArn").map(|s| s.to_string()),
2356            created_at: now,
2357            updated_at: now,
2358            custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2359        };
2360        state
2361            .pull_through_cache_rules
2362            .insert(prefix.clone(), rule.clone());
2363        Ok(AwsResponse::ok_json(pull_through_rule_json(
2364            state.account_id.as_str(),
2365            &rule,
2366        )))
2367    }
2368
2369    fn delete_pull_through_cache_rule(
2370        &self,
2371        request: &AwsRequest,
2372    ) -> Result<AwsResponse, AwsServiceError> {
2373        let body = request.json_body();
2374        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2375        validate_pullthrough_prefix(&prefix)?;
2376        let account = target_account_id(request, &body);
2377        let mut accounts = self.state.write();
2378        let state = accounts.get_or_create(&account);
2379        let removed = state
2380            .pull_through_cache_rules
2381            .remove(&prefix)
2382            .ok_or_else(|| {
2383                AwsServiceError::aws_error(
2384                    StatusCode::BAD_REQUEST,
2385                    "PullThroughCacheRuleNotFoundException",
2386                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2387                )
2388            })?;
2389        // DeletePullThroughCacheRuleResponse omits upstreamRegistry per
2390        // the Smithy model — it only appears on Create/Describe.
2391        let mut response = pull_through_rule_json(state.account_id.as_str(), &removed);
2392        if let Value::Object(ref mut map) = response {
2393            map.remove("upstreamRegistry");
2394        }
2395        Ok(AwsResponse::ok_json(response))
2396    }
2397
2398    fn describe_pull_through_cache_rules(
2399        &self,
2400        request: &AwsRequest,
2401    ) -> Result<AwsResponse, AwsServiceError> {
2402        let body = request.json_body();
2403        validate_max_results(&body)?;
2404        let prefixes: Vec<String> = body
2405            .get("ecrRepositoryPrefixes")
2406            .and_then(|v| v.as_array())
2407            .map(|arr| {
2408                arr.iter()
2409                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2410                    .collect()
2411            })
2412            .unwrap_or_default();
2413        let account = target_account_id(request, &body);
2414        let accounts = self.state.read();
2415        let state = accounts.get(&account);
2416        let rules: Vec<&crate::state::PullThroughCacheRule> = state
2417            .map(|s| s.pull_through_cache_rules.values().collect())
2418            .unwrap_or_default();
2419        let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2420        let filtered: Vec<Value> = rules
2421            .iter()
2422            .filter(|r| prefixes.is_empty() || prefixes.contains(&r.ecr_repository_prefix))
2423            .map(|r| pull_through_rule_json_with_updated(&registry_id, r))
2424            .collect();
2425        Ok(AwsResponse::ok_json(json!({
2426            "pullThroughCacheRules": filtered,
2427        })))
2428    }
2429
2430    fn update_pull_through_cache_rule(
2431        &self,
2432        request: &AwsRequest,
2433    ) -> Result<AwsResponse, AwsServiceError> {
2434        let body = request.json_body();
2435        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2436        let account = target_account_id(request, &body);
2437        let mut accounts = self.state.write();
2438        let state = accounts.get_or_create(&account);
2439        let rule = state
2440            .pull_through_cache_rules
2441            .get_mut(&prefix)
2442            .ok_or_else(|| {
2443                AwsServiceError::aws_error(
2444                    StatusCode::BAD_REQUEST,
2445                    "PullThroughCacheRuleNotFoundException",
2446                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2447                )
2448            })?;
2449        if let Some(cred) = opt_str(&body, "credentialArn") {
2450            rule.credential_arn = Some(cred.to_string());
2451        }
2452        if let Some(role) = opt_str(&body, "customRoleArn") {
2453            rule.custom_role_arn = Some(role.to_string());
2454        }
2455        rule.updated_at = Utc::now();
2456        let response = pull_through_rule_json_with_updated(state.account_id.as_str(), rule);
2457        Ok(AwsResponse::ok_json(response))
2458    }
2459
2460    fn validate_pull_through_cache_rule(
2461        &self,
2462        request: &AwsRequest,
2463    ) -> Result<AwsResponse, AwsServiceError> {
2464        let body = request.json_body();
2465        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2466        let account = target_account_id(request, &body);
2467        let accounts = self.state.read();
2468        let state = accounts.get(&account);
2469        let rule = state
2470            .and_then(|s| s.pull_through_cache_rules.get(&prefix))
2471            .ok_or_else(|| {
2472                AwsServiceError::aws_error(
2473                    StatusCode::BAD_REQUEST,
2474                    "PullThroughCacheRuleNotFoundException",
2475                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2476                )
2477            })?;
2478        let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2479        let mut base = pull_through_rule_json(&registry_id, rule);
2480        base["isValid"] = json!(true);
2481        Ok(AwsResponse::ok_json(base))
2482    }
2483
2484    fn get_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2485        let body = request.json_body();
2486        let name = req_str(&body, "name")?.to_string();
2487        validate_account_setting_name(&name)?;
2488        let account = target_account_id(request, &body);
2489        let accounts = self.state.read();
2490        let state = accounts.get(&account);
2491        let value = state
2492            .and_then(|s| s.account_settings.get(&name).cloned())
2493            .unwrap_or_else(|| "DISABLED".to_string());
2494        Ok(AwsResponse::ok_json(json!({
2495            "name": name,
2496            "value": value,
2497        })))
2498    }
2499
2500    fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2501        let body = request.json_body();
2502        let name = req_str(&body, "name")?.to_string();
2503        validate_account_setting_name(&name)?;
2504        let value = req_str(&body, "value")?.to_string();
2505        let account = target_account_id(request, &body);
2506        let mut accounts = self.state.write();
2507        let state = accounts.get_or_create(&account);
2508        state.account_settings.insert(name.clone(), value.clone());
2509        Ok(AwsResponse::ok_json(json!({
2510            "name": name,
2511            "value": value,
2512        })))
2513    }
2514
2515    fn create_repository_creation_template(
2516        &self,
2517        request: &AwsRequest,
2518    ) -> Result<AwsResponse, AwsServiceError> {
2519        use crate::state::{EncryptionConfiguration as Enc, RepositoryCreationTemplate};
2520        let body = request.json_body();
2521        let prefix = req_str(&body, "prefix")?.to_string();
2522        validate_template_prefix(&prefix)?;
2523        let applied_for: Vec<String> = body
2524            .get("appliedFor")
2525            .and_then(|v| v.as_array())
2526            .map(|arr| {
2527                arr.iter()
2528                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2529                    .collect()
2530            })
2531            .unwrap_or_default();
2532        let image_tag_mutability = opt_str(&body, "imageTagMutability")
2533            .unwrap_or("MUTABLE")
2534            .to_string();
2535        let resource_tags = body
2536            .get("resourceTags")
2537            .and_then(|v| v.as_array())
2538            .cloned()
2539            .unwrap_or_default();
2540        let encryption = body.get("encryptionConfiguration").map(|v| Enc {
2541            encryption_type: v
2542                .get("encryptionType")
2543                .and_then(|x| x.as_str())
2544                .unwrap_or("AES256")
2545                .to_string(),
2546            kms_key: v
2547                .get("kmsKey")
2548                .and_then(|x| x.as_str())
2549                .map(|s| s.to_string()),
2550        });
2551        let account = target_account_id(request, &body);
2552        let mut accounts = self.state.write();
2553        let state = accounts.get_or_create(&account);
2554        if state.repository_creation_templates.contains_key(&prefix) {
2555            return Err(AwsServiceError::aws_error(
2556                StatusCode::BAD_REQUEST,
2557                "TemplateAlreadyExistsException",
2558                format!(
2559                    "A repository creation template with the prefix '{prefix}' already exists."
2560                ),
2561            ));
2562        }
2563        let now = Utc::now();
2564        let tpl = RepositoryCreationTemplate {
2565            prefix: prefix.clone(),
2566            description: opt_str(&body, "description").map(|s| s.to_string()),
2567            image_tag_mutability,
2568            applied_for,
2569            resource_tags,
2570            created_at: now,
2571            updated_at: now,
2572            custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2573            repository_policy: opt_str(&body, "repositoryPolicy").map(|s| s.to_string()),
2574            lifecycle_policy: opt_str(&body, "lifecyclePolicy").map(|s| s.to_string()),
2575            encryption_configuration: encryption,
2576        };
2577        state
2578            .repository_creation_templates
2579            .insert(prefix, tpl.clone());
2580        Ok(AwsResponse::ok_json(json!({
2581            "registryId": state.account_id,
2582            "repositoryCreationTemplate": template_to_json(&tpl),
2583        })))
2584    }
2585
2586    fn delete_repository_creation_template(
2587        &self,
2588        request: &AwsRequest,
2589    ) -> Result<AwsResponse, AwsServiceError> {
2590        let body = request.json_body();
2591        let prefix = req_str(&body, "prefix")?.to_string();
2592        validate_template_prefix(&prefix)?;
2593        let account = target_account_id(request, &body);
2594        let mut accounts = self.state.write();
2595        let state = accounts.get_or_create(&account);
2596        let removed = state
2597            .repository_creation_templates
2598            .remove(&prefix)
2599            .ok_or_else(|| {
2600                AwsServiceError::aws_error(
2601                    StatusCode::BAD_REQUEST,
2602                    "TemplateNotFoundException",
2603                    format!("No repository creation template with prefix '{prefix}' exists."),
2604                )
2605            })?;
2606        Ok(AwsResponse::ok_json(json!({
2607            "registryId": state.account_id,
2608            "repositoryCreationTemplate": template_to_json(&removed),
2609        })))
2610    }
2611
2612    fn describe_repository_creation_templates(
2613        &self,
2614        request: &AwsRequest,
2615    ) -> Result<AwsResponse, AwsServiceError> {
2616        let body = request.json_body();
2617        validate_max_results(&body)?;
2618        let prefixes: Vec<String> = body
2619            .get("prefixes")
2620            .and_then(|v| v.as_array())
2621            .map(|arr| {
2622                arr.iter()
2623                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2624                    .collect()
2625            })
2626            .unwrap_or_default();
2627        let account = target_account_id(request, &body);
2628        let accounts = self.state.read();
2629        let state = accounts.get(&account);
2630        let tpls: Vec<Value> = state
2631            .map(|s| {
2632                s.repository_creation_templates
2633                    .values()
2634                    .filter(|t| prefixes.is_empty() || prefixes.contains(&t.prefix))
2635                    .map(template_to_json)
2636                    .collect()
2637            })
2638            .unwrap_or_default();
2639        Ok(AwsResponse::ok_json(json!({
2640            "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2641            "repositoryCreationTemplates": tpls,
2642        })))
2643    }
2644
2645    fn update_repository_creation_template(
2646        &self,
2647        request: &AwsRequest,
2648    ) -> Result<AwsResponse, AwsServiceError> {
2649        let body = request.json_body();
2650        let prefix = req_str(&body, "prefix")?.to_string();
2651        validate_template_prefix(&prefix)?;
2652        let account = target_account_id(request, &body);
2653        let mut accounts = self.state.write();
2654        let state = accounts.get_or_create(&account);
2655        let tpl = state
2656            .repository_creation_templates
2657            .get_mut(&prefix)
2658            .ok_or_else(|| {
2659                AwsServiceError::aws_error(
2660                    StatusCode::BAD_REQUEST,
2661                    "TemplateNotFoundException",
2662                    format!("No repository creation template with prefix '{prefix}' exists."),
2663                )
2664            })?;
2665        if let Some(desc) = opt_str(&body, "description") {
2666            tpl.description = Some(desc.to_string());
2667        }
2668        if let Some(mutability) = opt_str(&body, "imageTagMutability") {
2669            tpl.image_tag_mutability = mutability.to_string();
2670        }
2671        if let Some(arr) = body.get("appliedFor").and_then(|v| v.as_array()) {
2672            tpl.applied_for = arr
2673                .iter()
2674                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2675                .collect();
2676        }
2677        if let Some(arr) = body.get("resourceTags").and_then(|v| v.as_array()) {
2678            tpl.resource_tags = arr.clone();
2679        }
2680        tpl.updated_at = Utc::now();
2681        Ok(AwsResponse::ok_json(json!({
2682            "registryId": state.account_id,
2683            "repositoryCreationTemplate": template_to_json(tpl),
2684        })))
2685    }
2686
2687    fn get_signing_configuration(
2688        &self,
2689        request: &AwsRequest,
2690    ) -> Result<AwsResponse, AwsServiceError> {
2691        let body = request.json_body();
2692        let account = target_account_id(request, &body);
2693        let accounts = self.state.read();
2694        let state = accounts.get(&account);
2695        let rules: Vec<Value> = state
2696            .and_then(|s| s.signing_configuration.as_ref())
2697            .map(|c| c.rules.clone())
2698            .unwrap_or_default();
2699        Ok(AwsResponse::ok_json(json!({
2700            "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2701            "signingConfiguration": {"rules": rules},
2702        })))
2703    }
2704
2705    fn put_signing_configuration(
2706        &self,
2707        request: &AwsRequest,
2708    ) -> Result<AwsResponse, AwsServiceError> {
2709        use crate::signing::TrustedKey;
2710        use crate::state::SigningConfiguration;
2711        let body = request.json_body();
2712        let cfg = body
2713            .get("signingConfiguration")
2714            .ok_or_else(|| invalid_parameter("Missing required field: signingConfiguration"))?;
2715        let rules: Vec<Value> = cfg
2716            .get("rules")
2717            .and_then(|v| v.as_array())
2718            .cloned()
2719            .unwrap_or_default();
2720
2721        // Extract trusted keys from the rules. AWS's real signing-config
2722        // schema is nested; we accept the minimal fakecloud-friendly
2723        // shape `{trustedKeys: [{keyId, pem, algorithm}]}` per rule.
2724        // Rules that don't carry recognised keys are still
2725        // round-trippable via the raw `rules` passthrough.
2726        let mut trusted_keys: Vec<TrustedKey> = Vec::new();
2727        for rule in &rules {
2728            let keys = match rule.get("trustedKeys").and_then(|v| v.as_array()) {
2729                Some(k) => k,
2730                None => continue,
2731            };
2732            for k in keys {
2733                let key_id = k
2734                    .get("keyId")
2735                    .and_then(|v| v.as_str())
2736                    .unwrap_or_default()
2737                    .to_string();
2738                let pem = match k.get("pem").and_then(|v| v.as_str()) {
2739                    Some(p) => p.to_string(),
2740                    None => continue,
2741                };
2742                let algorithm = k
2743                    .get("algorithm")
2744                    .and_then(|v| v.as_str())
2745                    .unwrap_or("ECDSA-P256")
2746                    .to_string();
2747                // Validate the PEM up front so bad rules fail
2748                // PutSigningConfiguration rather than silently skipping
2749                // verification at describe time.
2750                if <p256::ecdsa::VerifyingKey as p256::pkcs8::DecodePublicKey>::from_public_key_pem(
2751                    &pem,
2752                )
2753                .is_err()
2754                {
2755                    return Err(invalid_parameter(format!(
2756                        "trusted key {key_id} is not a valid ECDSA-P256 PEM-encoded public key"
2757                    )));
2758                }
2759                trusted_keys.push(TrustedKey {
2760                    key_id,
2761                    pem,
2762                    algorithm,
2763                });
2764            }
2765        }
2766
2767        let account = target_account_id(request, &body);
2768        let mut accounts = self.state.write();
2769        let state = accounts.get_or_create(&account);
2770        state.signing_configuration = Some(SigningConfiguration {
2771            rules: rules.clone(),
2772            trusted_keys,
2773        });
2774        Ok(AwsResponse::ok_json(json!({
2775            "signingConfiguration": {"rules": rules},
2776        })))
2777    }
2778
2779    fn delete_signing_configuration(
2780        &self,
2781        request: &AwsRequest,
2782    ) -> Result<AwsResponse, AwsServiceError> {
2783        let body = request.json_body();
2784        let account = target_account_id(request, &body);
2785        let mut accounts = self.state.write();
2786        let state = accounts.get_or_create(&account);
2787        state.signing_configuration = None;
2788        Ok(AwsResponse::ok_json(json!({})))
2789    }
2790
2791    fn describe_image_signing_status(
2792        &self,
2793        request: &AwsRequest,
2794    ) -> Result<AwsResponse, AwsServiceError> {
2795        let body = request.json_body();
2796        let name = req_str(&body, "repositoryName")?.to_string();
2797        let image_id = body
2798            .get("imageId")
2799            .cloned()
2800            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2801        let account = target_account_id(request, &body);
2802        let accounts = self.state.read();
2803        let state = accounts
2804            .get(&account)
2805            .ok_or_else(|| repository_not_found(&name))?;
2806        let repo = state
2807            .repositories
2808            .get(&name)
2809            .ok_or_else(|| repository_not_found(&name))?;
2810        let image_digest = resolve_image_digest(repo, &image_id)
2811            .ok_or_else(|| image_not_found(&name, &image_id))?;
2812
2813        let trusted_keys: &[crate::signing::TrustedKey] = state
2814            .signing_configuration
2815            .as_ref()
2816            .map(|c| c.trusted_keys.as_slice())
2817            .unwrap_or(&[]);
2818
2819        // Locate the cosign companion signature tag
2820        // (`sha256-<hex>.sig`) in the same repo. Absent -> UNSIGNED.
2821        let sig_tag = match crate::signing::companion_sig_tag(&image_digest) {
2822            Some(t) => t,
2823            None => {
2824                return Ok(AwsResponse::ok_json(json!({
2825                    "registryId": repo.registry_id,
2826                    "repositoryName": name,
2827                    "imageId": image_id,
2828                    "imageSignatures": [],
2829                    "signingStatus": "UNSIGNED",
2830                })));
2831            }
2832        };
2833        let sig_manifest_digest = match repo.image_tags.get(&sig_tag) {
2834            Some(d) => d,
2835            None => {
2836                return Ok(AwsResponse::ok_json(json!({
2837                    "registryId": repo.registry_id,
2838                    "repositoryName": name,
2839                    "imageId": image_id,
2840                    "imageSignatures": [],
2841                    "signingStatus": "UNSIGNED",
2842                })));
2843            }
2844        };
2845        let sig_image = match repo.images.get(sig_manifest_digest) {
2846            Some(i) => i,
2847            None => {
2848                return Ok(AwsResponse::ok_json(json!({
2849                    "registryId": repo.registry_id,
2850                    "repositoryName": name,
2851                    "imageId": image_id,
2852                    "imageSignatures": [],
2853                    "signingStatus": "UNSIGNED",
2854                })));
2855            }
2856        };
2857
2858        let manifest_json: Value = match serde_json::from_str(&sig_image.image_manifest) {
2859            Ok(v) => v,
2860            Err(_) => {
2861                return Ok(AwsResponse::ok_json(json!({
2862                    "registryId": repo.registry_id,
2863                    "repositoryName": name,
2864                    "imageId": image_id,
2865                    "imageSignatures": [],
2866                    "signingStatus": "INVALID_SIGNATURE",
2867                })));
2868            }
2869        };
2870        let (layer_digest, signature_b64) =
2871            match crate::signing::extract_signature_annotation(&manifest_json) {
2872                Some(x) => x,
2873                None => {
2874                    return Ok(AwsResponse::ok_json(json!({
2875                        "registryId": repo.registry_id,
2876                        "repositoryName": name,
2877                        "imageId": image_id,
2878                        "imageSignatures": [],
2879                        "signingStatus": "UNSIGNED",
2880                    })));
2881                }
2882            };
2883
2884        // Fetch the payload (signed bytes = simple-signing JSON blob).
2885        let payload_bytes: Vec<u8> = match repo.layers.get(&layer_digest) {
2886            Some(layer) => base64::Engine::decode(
2887                &base64::engine::general_purpose::STANDARD,
2888                layer.blob_b64.as_bytes(),
2889            )
2890            .unwrap_or_default(),
2891            None => {
2892                return Ok(AwsResponse::ok_json(json!({
2893                    "registryId": repo.registry_id,
2894                    "repositoryName": name,
2895                    "imageId": image_id,
2896                    "imageSignatures": [],
2897                    "signingStatus": "UNSIGNED",
2898                })));
2899            }
2900        };
2901
2902        // The payload must name the signed image — catches copy-paste
2903        // of one image's signature onto another.
2904        if let Some(named) = crate::signing::referenced_image_digest(&payload_bytes) {
2905            if named != image_digest {
2906                return Ok(AwsResponse::ok_json(json!({
2907                    "registryId": repo.registry_id,
2908                    "repositoryName": name,
2909                    "imageId": image_id,
2910                    "imageSignatures": [],
2911                    "signingStatus": "INVALID_SIGNATURE",
2912                    "statusReason": "signature payload references a different image digest",
2913                })));
2914            }
2915        }
2916
2917        // Verify against each trusted key until one matches.
2918        let mut matched: Option<&crate::signing::TrustedKey> = None;
2919        for key in trusted_keys {
2920            if crate::signing::verify_cosign_signature(&key.pem, &payload_bytes, &signature_b64)
2921                .is_ok()
2922            {
2923                matched = Some(key);
2924                break;
2925            }
2926        }
2927
2928        let mut response = json!({
2929            "registryId": repo.registry_id,
2930            "repositoryName": name,
2931            "imageId": image_id,
2932        });
2933        if let Some(key) = matched {
2934            response["imageSignatures"] = json!([{
2935                "signatureFormat": "COSIGN",
2936                "keyId": key.key_id,
2937                "algorithm": key.algorithm,
2938                "valid": true,
2939            }]);
2940            response["signingStatus"] = json!("SIGNED");
2941        } else if trusted_keys.is_empty() {
2942            // A valid-looking signature exists but no trusted keys are
2943            // configured to verify against — surface the signature but
2944            // mark it UNVERIFIED so the caller knows to configure keys.
2945            response["imageSignatures"] = json!([{
2946                "signatureFormat": "COSIGN",
2947                "valid": false,
2948                "statusReason": "no trusted keys configured"
2949            }]);
2950            response["signingStatus"] = json!("UNVERIFIED");
2951        } else {
2952            response["imageSignatures"] = json!([{
2953                "signatureFormat": "COSIGN",
2954                "valid": false,
2955                "statusReason": "signature did not match any trusted key"
2956            }]);
2957            response["signingStatus"] = json!("INVALID_SIGNATURE");
2958        }
2959        Ok(AwsResponse::ok_json(response))
2960    }
2961
2962    fn register_pull_time_update_exclusion(
2963        &self,
2964        request: &AwsRequest,
2965    ) -> Result<AwsResponse, AwsServiceError> {
2966        use crate::state::PullTimeExclusion;
2967        let body = request.json_body();
2968        let principal_arn = req_str(&body, "principalArn")?.to_string();
2969        // Smithy `com.amazonaws.ecr#PrincipalArn` length 0..=200.
2970        validate_string_length("principalArn", &principal_arn, 0, 200)?;
2971        let account = target_account_id(request, &body);
2972        let mut accounts = self.state.write();
2973        let state = accounts.get_or_create(&account);
2974        state
2975            .pull_time_exclusions
2976            .entry(principal_arn.clone())
2977            .or_insert_with(|| PullTimeExclusion {
2978                principal_arn: principal_arn.clone(),
2979                registered_at: Utc::now(),
2980            });
2981        Ok(AwsResponse::ok_json(json!({
2982            "principalArn": principal_arn,
2983        })))
2984    }
2985
2986    fn deregister_pull_time_update_exclusion(
2987        &self,
2988        request: &AwsRequest,
2989    ) -> Result<AwsResponse, AwsServiceError> {
2990        let body = request.json_body();
2991        let principal_arn = req_str(&body, "principalArn")?.to_string();
2992        validate_string_length("principalArn", &principal_arn, 0, 200)?;
2993        let account = target_account_id(request, &body);
2994        let mut accounts = self.state.write();
2995        let state = accounts.get_or_create(&account);
2996        state.pull_time_exclusions.remove(&principal_arn);
2997        Ok(AwsResponse::ok_json(json!({
2998            "principalArn": principal_arn,
2999        })))
3000    }
3001
3002    fn list_pull_time_update_exclusions(
3003        &self,
3004        request: &AwsRequest,
3005    ) -> Result<AwsResponse, AwsServiceError> {
3006        let body = request.json_body();
3007        validate_max_results(&body)?;
3008        let account = target_account_id(request, &body);
3009        let accounts = self.state.read();
3010        let state = accounts.get(&account);
3011        let exclusions: Vec<Value> = state
3012            .map(|s| {
3013                s.pull_time_exclusions
3014                    .values()
3015                    .map(|e| {
3016                        json!({
3017                            "principalArn": e.principal_arn,
3018                            "registeredAt": e.registered_at.timestamp(),
3019                        })
3020                    })
3021                    .collect()
3022            })
3023            .unwrap_or_default();
3024        Ok(AwsResponse::ok_json(json!({
3025            "pullTimeUpdateExclusions": exclusions,
3026        })))
3027    }
3028
3029    fn list_image_referrers(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3030        let body = request.json_body();
3031        let name = req_str(&body, "repositoryName")?.to_string();
3032        let subject = body
3033            .get("subjectId")
3034            .cloned()
3035            .ok_or_else(|| invalid_parameter("Missing subjectId"))?;
3036        let digest = subject
3037            .get("imageDigest")
3038            .and_then(|v| v.as_str())
3039            .ok_or_else(|| invalid_parameter("subjectId.imageDigest is required"))?
3040            .to_string();
3041        // Optional filter: artifactTypes + artifactStatus. Defaults match
3042        // AWS: `artifactStatus = ACTIVE` only when no filter is supplied.
3043        let filter = body.get("filter").cloned().unwrap_or(Value::Null);
3044        let artifact_type_filter: Option<Vec<String>> = filter
3045            .get("artifactTypes")
3046            .and_then(|v| v.as_array())
3047            .map(|arr| {
3048                arr.iter()
3049                    .filter_map(|v| v.as_str().map(str::to_string))
3050                    .collect()
3051            });
3052        let artifact_status_filter: String = filter
3053            .get("artifactStatus")
3054            .and_then(|v| v.as_str())
3055            .unwrap_or("ACTIVE")
3056            .to_string();
3057        let account = target_account_id(request, &body);
3058        let accounts = self.state.read();
3059        let state = accounts
3060            .get(&account)
3061            .ok_or_else(|| repository_not_found(&name))?;
3062        let repo = state
3063            .repositories
3064            .get(&name)
3065            .ok_or_else(|| repository_not_found(&name))?;
3066        if !repo.images.contains_key(&digest) {
3067            return Err(AwsServiceError::aws_error(
3068                StatusCode::BAD_REQUEST,
3069                "ImageNotFoundException",
3070                format!("Subject image {digest} not found in repository '{name}'"),
3071            ));
3072        }
3073        // Walk every image; include those whose manifest has a
3074        // `subject.digest` matching the requested subject. The subject
3075        // is the OCI Distribution v1.1 referrer relation.
3076        let mut referrers: Vec<Value> = Vec::new();
3077        for image in repo.images.values() {
3078            let parsed: Value = match serde_json::from_str(&image.image_manifest) {
3079                Ok(v) => v,
3080                Err(_) => continue,
3081            };
3082            let subject_digest = parsed
3083                .get("subject")
3084                .and_then(|s| s.get("digest"))
3085                .and_then(|d| d.as_str());
3086            if subject_digest != Some(digest.as_str()) {
3087                continue;
3088            }
3089            // OCI artifact type lives at `artifactType` (image manifest)
3090            // or `config.mediaType` (image-config-as-artifact).
3091            let artifact_type = parsed
3092                .get("artifactType")
3093                .and_then(|v| v.as_str())
3094                .or_else(|| {
3095                    parsed
3096                        .get("config")
3097                        .and_then(|c| c.get("mediaType"))
3098                        .and_then(|v| v.as_str())
3099                })
3100                .map(str::to_string);
3101            if let Some(ref allowed) = artifact_type_filter {
3102                if !allowed.iter().any(|t| Some(t) == artifact_type.as_ref()) {
3103                    continue;
3104                }
3105            }
3106            // Status filter: `ANY` matches everything; otherwise the
3107            // referrer's `image_status` must equal the filter value.
3108            if artifact_status_filter != "ANY" && image.image_status != artifact_status_filter {
3109                continue;
3110            }
3111            // Annotations live on the manifest under `annotations`.
3112            let annotations = parsed
3113                .get("annotations")
3114                .cloned()
3115                .unwrap_or_else(|| json!({}));
3116            let mut referrer = json!({
3117                "digest": image.image_digest,
3118                "mediaType": image.image_manifest_media_type,
3119                "size": image.image_size_in_bytes,
3120                "annotations": annotations,
3121                "artifactStatus": image.image_status,
3122            });
3123            if let Some(t) = artifact_type {
3124                referrer["artifactType"] = json!(t);
3125            }
3126            referrers.push(referrer);
3127        }
3128        Ok(AwsResponse::ok_json(json!({
3129            "referrers": referrers,
3130        })))
3131    }
3132
3133    fn update_image_storage_class(
3134        &self,
3135        request: &AwsRequest,
3136    ) -> Result<AwsResponse, AwsServiceError> {
3137        let body = request.json_body();
3138        let name = req_str(&body, "repositoryName")?.to_string();
3139        let image_id = body
3140            .get("imageId")
3141            .cloned()
3142            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
3143        let target_class = req_str(&body, "targetStorageClass")?.to_string();
3144        // Smithy `TargetStorageClass` is `STANDARD | ARCHIVE`. Anything
3145        // else is rejected the way AWS would reject an enum mismatch.
3146        if target_class != "STANDARD" && target_class != "ARCHIVE" {
3147            return Err(invalid_parameter(format!(
3148                "Invalid targetStorageClass '{target_class}'. Must be STANDARD or ARCHIVE."
3149            )));
3150        }
3151        let account = target_account_id(request, &body);
3152        let mut accounts = self.state.write();
3153        let state = accounts
3154            .get_mut(&account)
3155            .ok_or_else(|| repository_not_found(&name))?;
3156        let repo = state
3157            .repositories
3158            .get_mut(&name)
3159            .ok_or_else(|| repository_not_found(&name))?;
3160        let digest = resolve_image_digest(repo, &image_id)
3161            .ok_or_else(|| image_not_found(&name, &image_id))?;
3162        let registry_id = repo.registry_id.clone();
3163        let now = Utc::now();
3164        let image = repo.images.get_mut(&digest).expect("digest resolves");
3165        let new_status = match target_class.as_str() {
3166            "ARCHIVE" => {
3167                image.last_archived_at = Some(now);
3168                "ARCHIVED"
3169            }
3170            // STANDARD = restore from archive.
3171            _ => {
3172                image.last_activated_at = Some(now);
3173                "ACTIVE"
3174            }
3175        };
3176        image.image_status = new_status.to_string();
3177        Ok(AwsResponse::ok_json(json!({
3178            "registryId": registry_id,
3179            "repositoryName": name,
3180            "imageId": image_id,
3181            "imageStatus": new_status,
3182        })))
3183    }
3184}
3185
3186#[path = "service_helpers.rs"]
3187mod service_helpers;
3188pub use service_helpers::evaluate_lifecycle_policy;
3189pub(crate) use service_helpers::*;
3190
3191#[cfg(test)]
3192#[path = "service_tests.rs"]
3193mod tests;