Skip to main content

fakecloud_ecr/
service.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19    EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20    Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24    "CreateRepository",
25    "DeleteRepository",
26    "DescribeRepositories",
27    "PutImageTagMutability",
28    "PutImageScanningConfiguration",
29    "SetRepositoryPolicy",
30    "GetRepositoryPolicy",
31    "DeleteRepositoryPolicy",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutImage",
36    "BatchGetImage",
37    "BatchDeleteImage",
38    "BatchCheckLayerAvailability",
39    "DescribeImages",
40    "ListImages",
41    "GetDownloadUrlForLayer",
42    "InitiateLayerUpload",
43    "UploadLayerPart",
44    "CompleteLayerUpload",
45    "GetAuthorizationToken",
46    "PutLifecyclePolicy",
47    "GetLifecyclePolicy",
48    "DeleteLifecyclePolicy",
49    "StartLifecyclePolicyPreview",
50    "GetLifecyclePolicyPreview",
51    "StartImageScan",
52    "DescribeImageScanFindings",
53    "DescribeRegistry",
54    "GetRegistryPolicy",
55    "PutRegistryPolicy",
56    "DeleteRegistryPolicy",
57    "GetRegistryScanningConfiguration",
58    "PutRegistryScanningConfiguration",
59    "BatchGetRepositoryScanningConfiguration",
60    "PutReplicationConfiguration",
61    "DescribeImageReplicationStatus",
62    "CreatePullThroughCacheRule",
63    "DeletePullThroughCacheRule",
64    "DescribePullThroughCacheRules",
65    "UpdatePullThroughCacheRule",
66    "ValidatePullThroughCacheRule",
67    "GetAccountSetting",
68    "PutAccountSetting",
69    "CreateRepositoryCreationTemplate",
70    "DeleteRepositoryCreationTemplate",
71    "DescribeRepositoryCreationTemplates",
72    "UpdateRepositoryCreationTemplate",
73    "GetSigningConfiguration",
74    "PutSigningConfiguration",
75    "DeleteSigningConfiguration",
76    "DescribeImageSigningStatus",
77    "RegisterPullTimeUpdateExclusion",
78    "DeregisterPullTimeUpdateExclusion",
79    "ListPullTimeUpdateExclusions",
80    "ListImageReferrers",
81    "UpdateImageStorageClass",
82];
83
84/// Actions that mutate persisted state. Only these trigger a snapshot save.
85fn is_mutating(action: &str) -> bool {
86    matches!(
87        action,
88        "CreateRepository"
89            | "DeleteRepository"
90            | "PutImageTagMutability"
91            | "PutImageScanningConfiguration"
92            | "SetRepositoryPolicy"
93            | "DeleteRepositoryPolicy"
94            | "TagResource"
95            | "UntagResource"
96            | "PutImage"
97            | "BatchDeleteImage"
98            | "InitiateLayerUpload"
99            | "UploadLayerPart"
100            | "CompleteLayerUpload"
101            | "PutLifecyclePolicy"
102            | "DeleteLifecyclePolicy"
103            | "StartLifecyclePolicyPreview"
104            | "StartImageScan"
105            | "PutRegistryPolicy"
106            | "DeleteRegistryPolicy"
107            | "PutRegistryScanningConfiguration"
108            | "PutReplicationConfiguration"
109            | "CreatePullThroughCacheRule"
110            | "DeletePullThroughCacheRule"
111            | "UpdatePullThroughCacheRule"
112            | "PutAccountSetting"
113            | "CreateRepositoryCreationTemplate"
114            | "DeleteRepositoryCreationTemplate"
115            | "UpdateRepositoryCreationTemplate"
116            | "PutSigningConfiguration"
117            | "DeleteSigningConfiguration"
118            | "RegisterPullTimeUpdateExclusion"
119            | "DeregisterPullTimeUpdateExclusion"
120            | "UpdateImageStorageClass"
121    )
122}
123
124pub struct EcrService {
125    state: SharedEcrState,
126    snapshot_store: Option<Arc<dyn SnapshotStore>>,
127    snapshot_lock: Arc<AsyncMutex<()>>,
128    /// KMS state handle — when wired, repositories configured with
129    /// `EncryptionConfiguration.encryption_type == "KMS"` store layer
130    /// blobs encrypted under the configured CMK via
131    /// `fakecloud_kms::api::encrypt_blob` / `decrypt_blob`.
132    kms_state: Option<fakecloud_kms::state::SharedKmsState>,
133}
134
135impl EcrService {
136    pub fn new(state: SharedEcrState) -> Self {
137        Self {
138            state,
139            snapshot_store: None,
140            snapshot_lock: Arc::new(AsyncMutex::new(())),
141            kms_state: None,
142        }
143    }
144
145    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
146        self.snapshot_store = Some(store);
147        self
148    }
149
150    pub fn with_kms(mut self, kms: fakecloud_kms::state::SharedKmsState) -> Self {
151        self.kms_state = Some(kms);
152        self
153    }
154
155    /// Read-only accessor for the multi-account state. The sibling
156    /// `oci` module owns the HTTP-layer adapter for the OCI v2
157    /// protocol and needs to reach the same repositories + blobs the
158    /// JSON control-plane ops read and write.
159    pub fn state_handle(&self) -> &SharedEcrState {
160        &self.state
161    }
162
163    /// Handle for the shared KMS state when wired. `None` skips the
164    /// encrypt/decrypt paths and stores / returns plaintext blobs.
165    pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::state::SharedKmsState> {
166        self.kms_state.as_ref()
167    }
168
169    async fn save_snapshot(&self) {
170        Self::save_snapshot_with(
171            self.state.clone(),
172            self.snapshot_store.clone(),
173            self.snapshot_lock.clone(),
174        )
175        .await
176    }
177
178    /// Snapshot writer reachable from background tasks (e.g. the async
179    /// image-scanner) that don't hold a `&self` reference. Equivalent
180    /// to [`save_snapshot`] but takes the components as owned clones.
181    pub(crate) async fn save_snapshot_with(
182        state: SharedEcrState,
183        store: Option<Arc<dyn SnapshotStore>>,
184        lock: Arc<AsyncMutex<()>>,
185    ) {
186        let Some(store) = store else {
187            return;
188        };
189        let _guard = lock.lock().await;
190        let snapshot = EcrSnapshot {
191            schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
192            accounts: Some(state.read().clone()),
193        };
194        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
195            let bytes = serde_json::to_vec(&snapshot)
196                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
197            store.save(&bytes)
198        })
199        .await;
200        match join {
201            Ok(Ok(())) => {}
202            Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
203            Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
204        }
205    }
206}
207
208#[async_trait]
209impl AwsService for EcrService {
210    fn service_name(&self) -> &str {
211        "ecr"
212    }
213
214    async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
215        // OCI v2 Distribution requests come in as path-only REST
216        // (`/v2/...` with no `X-Amz-Target`). Dispatch them before the
217        // JSON control plane. Blob upload PATCH/PUT consume the raw
218        // body stream directly via a per-upload spool file — they
219        // never call `drain_request_stream`, so a 1 GiB layer push
220        // moves through fakecloud in constant memory. Other OCI routes
221        // (manifest PUT, blob HEAD/GET, …) keep using `request.body`
222        // so we drain the stream conditionally.
223        if request
224            .path_segments
225            .first()
226            .map(|s| s == "v2")
227            .unwrap_or(false)
228        {
229            // Drain unless this is a blob-upload PATCH/PUT — those
230            // routes own the streaming consumer.
231            let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
232                && request.path_segments.len() >= 5
233                && request.path_segments[request.path_segments.len() - 2] == "uploads";
234            if !is_blob_upload {
235                if let Some(stream) = request.take_body_stream() {
236                    request.body = fakecloud_core::service::drain_request_stream(stream).await?;
237                }
238            }
239            let result = crate::oci::dispatch(self, &request).await;
240            let mutates_oci = matches!(
241                request.method,
242                http::Method::POST | http::Method::PUT | http::Method::PATCH | http::Method::DELETE
243            );
244            if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
245                self.save_snapshot().await;
246            }
247            return result;
248        }
249
250        // JSON control plane: actions read `request.body`, so drain.
251        if let Some(stream) = request.take_body_stream() {
252            request.body = fakecloud_core::service::drain_request_stream(stream).await?;
253        }
254
255        let mutates = is_mutating(request.action.as_str());
256        let result = match request.action.as_str() {
257            "CreateRepository" => self.create_repository(&request),
258            "DeleteRepository" => self.delete_repository(&request),
259            "DescribeRepositories" => self.describe_repositories(&request),
260            "PutImageTagMutability" => self.put_image_tag_mutability(&request),
261            "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
262            "SetRepositoryPolicy" => self.set_repository_policy(&request),
263            "GetRepositoryPolicy" => self.get_repository_policy(&request),
264            "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
265            "TagResource" => self.tag_resource(&request),
266            "UntagResource" => self.untag_resource(&request),
267            "ListTagsForResource" => self.list_tags_for_resource(&request),
268            "PutImage" => self.put_image(&request),
269            "BatchGetImage" => self.batch_get_image(&request),
270            "BatchDeleteImage" => self.batch_delete_image(&request),
271            "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
272            "DescribeImages" => self.describe_images(&request),
273            "ListImages" => self.list_images(&request),
274            "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
275            "InitiateLayerUpload" => self.initiate_layer_upload(&request),
276            "UploadLayerPart" => self.upload_layer_part(&request),
277            "CompleteLayerUpload" => self.complete_layer_upload(&request),
278            "GetAuthorizationToken" => self.get_authorization_token(&request),
279            "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
280            "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
281            "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
282            "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
283            "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
284            "StartImageScan" => self.start_image_scan(&request),
285            "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
286            "DescribeRegistry" => self.describe_registry(&request),
287            "GetRegistryPolicy" => self.get_registry_policy(&request),
288            "PutRegistryPolicy" => self.put_registry_policy(&request),
289            "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
290            "GetRegistryScanningConfiguration" => {
291                self.get_registry_scanning_configuration(&request)
292            }
293            "PutRegistryScanningConfiguration" => {
294                self.put_registry_scanning_configuration(&request)
295            }
296            "BatchGetRepositoryScanningConfiguration" => {
297                self.batch_get_repository_scanning_configuration(&request)
298            }
299            "PutReplicationConfiguration" => self.put_replication_configuration(&request),
300            "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
301            "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
302            "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
303            "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
304            "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
305            "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
306            "GetAccountSetting" => self.get_account_setting(&request),
307            "PutAccountSetting" => self.put_account_setting(&request),
308            "CreateRepositoryCreationTemplate" => {
309                self.create_repository_creation_template(&request)
310            }
311            "DeleteRepositoryCreationTemplate" => {
312                self.delete_repository_creation_template(&request)
313            }
314            "DescribeRepositoryCreationTemplates" => {
315                self.describe_repository_creation_templates(&request)
316            }
317            "UpdateRepositoryCreationTemplate" => {
318                self.update_repository_creation_template(&request)
319            }
320            "GetSigningConfiguration" => self.get_signing_configuration(&request),
321            "PutSigningConfiguration" => self.put_signing_configuration(&request),
322            "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
323            "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
324            "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
325            "DeregisterPullTimeUpdateExclusion" => {
326                self.deregister_pull_time_update_exclusion(&request)
327            }
328            "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
329            "ListImageReferrers" => self.list_image_referrers(&request),
330            "UpdateImageStorageClass" => self.update_image_storage_class(&request),
331            _ => Err(AwsServiceError::action_not_implemented(
332                self.service_name(),
333                &request.action,
334            )),
335        };
336        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
337            self.save_snapshot().await;
338        }
339        result
340    }
341
342    fn supported_actions(&self) -> &[&str] {
343        SUPPORTED_ACTIONS
344    }
345}
346
347// -------- helpers --------
348
349fn req_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
350    body.get(field)
351        .and_then(|v| v.as_str())
352        .ok_or_else(|| invalid_parameter(format!("Missing required field: {field}")))
353}
354
355fn opt_str<'a>(body: &'a Value, field: &str) -> Option<&'a str> {
356    body.get(field).and_then(|v| v.as_str())
357}
358
359fn invalid_parameter(message: impl Into<String>) -> AwsServiceError {
360    AwsServiceError::aws_error(
361        StatusCode::BAD_REQUEST,
362        "InvalidParameterException",
363        message,
364    )
365}
366
367fn repository_not_found(name: &str) -> AwsServiceError {
368    AwsServiceError::aws_error(
369        StatusCode::BAD_REQUEST,
370        "RepositoryNotFoundException",
371        format!(
372            "The repository with name '{name}' does not exist in the registry with id '{registry}'",
373            name = name,
374            registry = "",
375        ),
376    )
377}
378
379fn repository_already_exists(name: &str) -> AwsServiceError {
380    AwsServiceError::aws_error(
381        StatusCode::BAD_REQUEST,
382        "RepositoryAlreadyExistsException",
383        format!("The repository with name '{name}' already exists in the registry."),
384    )
385}
386
387fn repository_policy_not_found(name: &str) -> AwsServiceError {
388    AwsServiceError::aws_error(
389        StatusCode::BAD_REQUEST,
390        "RepositoryPolicyNotFoundException",
391        format!("Repository policy does not exist for the repository with name '{name}'."),
392    )
393}
394
395/// Validate ECR repository name against AWS pattern:
396/// `(?:[a-z0-9]+(?:[._-][a-z0-9]+)*/)*[a-z0-9]+(?:[._-][a-z0-9]+)*`, length 2–256.
397/// Each `/`-separated segment starts and ends with `[a-z0-9]` and uses
398/// `[._-]` only between alphanum runs.
399fn validate_repository_name(name: &str) -> Result<(), AwsServiceError> {
400    let invalid = || {
401        invalid_parameter(format!(
402            "Invalid parameter at 'repositoryName': '{name}' failed to satisfy constraint: \
403             'must satisfy regular expression pattern: (?:[a-z0-9]+(?:[._-][a-z0-9]+)*/)*[a-z0-9]+(?:[._-][a-z0-9]+)*'",
404        ))
405    };
406    if name.len() < 2 || name.len() > 256 {
407        return Err(invalid());
408    }
409    // Segments split by `/`. Empty segment (e.g. `foo/`, `foo//bar`,
410    // leading/trailing slash) is disallowed.
411    for segment in name.split('/') {
412        if segment.is_empty() {
413            return Err(invalid());
414        }
415        // Segment := alphanum+ ([._-] alphanum+)*
416        let bytes = segment.as_bytes();
417        let mut i = 0usize;
418        // Leading alphanum run (at least 1 byte).
419        if !is_alnum(bytes[0]) {
420            return Err(invalid());
421        }
422        while i < bytes.len() && is_alnum(bytes[i]) {
423            i += 1;
424        }
425        while i < bytes.len() {
426            // Separator.
427            if !matches!(bytes[i], b'.' | b'_' | b'-') {
428                return Err(invalid());
429            }
430            i += 1;
431            // Required alphanum run after each separator.
432            if i >= bytes.len() || !is_alnum(bytes[i]) {
433                return Err(invalid());
434            }
435            while i < bytes.len() && is_alnum(bytes[i]) {
436                i += 1;
437            }
438        }
439    }
440    Ok(())
441}
442
443fn is_alnum(b: u8) -> bool {
444    b.is_ascii_lowercase() || b.is_ascii_digit()
445}
446
447fn parse_tags(body: &Value) -> Vec<(String, String)> {
448    body.get("tags")
449        .and_then(|v| v.as_array())
450        .map(|arr| {
451            arr.iter()
452                .filter_map(|t| {
453                    let k = t.get("Key").and_then(|v| v.as_str())?;
454                    let v = t.get("Value").and_then(|v| v.as_str()).unwrap_or("");
455                    Some((k.to_string(), v.to_string()))
456                })
457                .collect()
458        })
459        .unwrap_or_default()
460}
461
462/// Resolve the account to scope this request to. ECR inputs use
463/// `registryId` to address another account; absent means caller's
464/// account. We mirror the cross-service pattern: if `registryId` is
465/// present and different, the caller must have cross-account trust —
466/// but for CRUD ops we only need to pick the right state entry.
467fn target_account_id(request: &AwsRequest, body: &Value) -> String {
468    if let Some(id) = body.get("registryId").and_then(|v| v.as_str()) {
469        if !id.is_empty() {
470            return id.to_string();
471        }
472    }
473    request.account_id.clone()
474}
475
476fn repository_to_json(repo: &Repository) -> Value {
477    json!({
478        "repositoryArn": repo.repository_arn,
479        "registryId": repo.registry_id,
480        "repositoryName": repo.repository_name,
481        "repositoryUri": repo.repository_uri,
482        "createdAt": repo.created_at.timestamp(),
483        "imageTagMutability": repo.image_tag_mutability,
484        "imageScanningConfiguration": {
485            "scanOnPush": repo.image_scanning_configuration.scan_on_push,
486        },
487        "encryptionConfiguration": encryption_config_json(&repo.encryption_configuration),
488    })
489}
490
491fn encryption_config_json(cfg: &EncryptionConfiguration) -> Value {
492    let mut map = Map::new();
493    map.insert("encryptionType".into(), json!(cfg.encryption_type));
494    if let Some(kms) = &cfg.kms_key {
495        map.insert("kmsKey".into(), json!(kms));
496    }
497    Value::Object(map)
498}
499
500/// Decode an ECR resource ARN into `(account_id, repository_name)`.
501/// Accepts either a full ARN (`arn:aws:ecr:region:account:repository/name`)
502/// or a bare repository name for request bodies that accept both.
503fn decode_resource_arn(arn: &str) -> Result<(Option<String>, String), AwsServiceError> {
504    if let Some(rest) = arn.strip_prefix("arn:aws:ecr:") {
505        let mut parts = rest.splitn(4, ':');
506        let _region = parts
507            .next()
508            .ok_or_else(|| invalid_parameter("Malformed resource ARN"))?;
509        let account = parts
510            .next()
511            .ok_or_else(|| invalid_parameter("Malformed resource ARN"))?;
512        let resource = parts
513            .next()
514            .ok_or_else(|| invalid_parameter("Malformed resource ARN"))?;
515        let repo = resource
516            .strip_prefix("repository/")
517            .ok_or_else(|| invalid_parameter("Resource ARN must reference a repository"))?;
518        Ok((Some(account.to_string()), repo.to_string()))
519    } else {
520        Ok((None, arn.to_string()))
521    }
522}
523
524// -------- operations --------
525
526impl EcrService {
527    fn create_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
528        let body = request.json_body();
529        let name = req_str(&body, "repositoryName")?.to_string();
530        validate_repository_name(&name)?;
531        let image_tag_mutability = opt_str(&body, "imageTagMutability")
532            .unwrap_or("MUTABLE")
533            .to_string();
534        if image_tag_mutability != "MUTABLE" && image_tag_mutability != "IMMUTABLE" {
535            return Err(invalid_parameter(format!(
536                "Invalid value for imageTagMutability: {image_tag_mutability}"
537            )));
538        }
539        let scan_on_push = body
540            .get("imageScanningConfiguration")
541            .and_then(|v| v.get("scanOnPush"))
542            .and_then(|v| v.as_bool())
543            .unwrap_or(false);
544        let encryption = body
545            .get("encryptionConfiguration")
546            .map(|v| EncryptionConfiguration {
547                encryption_type: v
548                    .get("encryptionType")
549                    .and_then(|x| x.as_str())
550                    .unwrap_or("AES256")
551                    .to_string(),
552                kms_key: v
553                    .get("kmsKey")
554                    .and_then(|x| x.as_str())
555                    .map(|s| s.to_string()),
556            })
557            .unwrap_or_default();
558        let tags = parse_tags(&body);
559
560        let account = target_account_id(request, &body);
561        let mut accounts = self.state.write();
562        let endpoint = accounts.endpoint().to_string();
563        let state = accounts.get_or_create(&account);
564        if state.repositories.contains_key(&name) {
565            return Err(repository_already_exists(&name));
566        }
567        let arn = state.repository_arn(&name);
568        let mut repo = Repository::new(&name, arn, state.registry_id(), &endpoint);
569        repo.image_tag_mutability = image_tag_mutability;
570        repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
571        repo.encryption_configuration = encryption;
572        for (k, v) in tags {
573            repo.tags.insert(k, v);
574        }
575        let response = repository_to_json(&repo);
576        state.repositories.insert(name.clone(), repo);
577        Ok(AwsResponse::ok_json(json!({ "repository": response })))
578    }
579
580    fn delete_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
581        let body = request.json_body();
582        let name = req_str(&body, "repositoryName")?.to_string();
583        let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
584        let account = target_account_id(request, &body);
585
586        let mut accounts = self.state.write();
587        let state = accounts
588            .get_mut(&account)
589            .ok_or_else(|| repository_not_found(&name))?;
590        let repo = state
591            .repositories
592            .get(&name)
593            .ok_or_else(|| repository_not_found(&name))?;
594        // Repository-image state lands in Batch 2; until then, nothing
595        // to block the delete, so `force` is accepted but noop-ish.
596        let _ = force;
597        let snapshot = repository_to_json(repo);
598        state.repositories.remove(&name);
599        Ok(AwsResponse::ok_json(json!({ "repository": snapshot })))
600    }
601
602    fn describe_repositories(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603        // AWS's documented default page size for DescribeRepositories.
604        const DEFAULT_PAGE_SIZE: usize = 100;
605        let body = request.json_body();
606        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
607            Some(n) => {
608                // Smithy @range(min=1, max=1000) on DescribeRepositories.maxResults.
609                if !(1..=1000).contains(&n) {
610                    return Err(invalid_parameter(format!(
611                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
612                         Member must have value between 1 and 1000",
613                    )));
614                }
615                n as usize
616            }
617            None => DEFAULT_PAGE_SIZE,
618        };
619        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
620            Some(raw) => raw.parse::<usize>().map_err(|_| {
621                AwsServiceError::aws_error(
622                    StatusCode::BAD_REQUEST,
623                    "InvalidContinuationTokenException",
624                    "The specified continuation token is not valid.",
625                )
626            })?,
627            None => 0,
628        };
629        let names: Vec<String> = body
630            .get("repositoryNames")
631            .and_then(|v| v.as_array())
632            .map(|arr| {
633                arr.iter()
634                    .filter_map(|v| v.as_str().map(str::to_string))
635                    .collect()
636            })
637            .unwrap_or_default();
638        let account = target_account_id(request, &body);
639        let accounts = self.state.read();
640        let Some(state) = accounts.get(&account) else {
641            return Ok(AwsResponse::ok_json(json!({ "repositories": [] })));
642        };
643        let mut out: Vec<Value> = Vec::new();
644        let mut next_token: Option<String> = None;
645        if names.is_empty() {
646            let all: Vec<&Repository> = state.repositories.values().collect();
647            let start = offset.min(all.len());
648            let end = (start + max_results).min(all.len());
649            for repo in &all[start..end] {
650                out.push(repository_to_json(repo));
651            }
652            if end < all.len() {
653                next_token = Some(end.to_string());
654            }
655        } else {
656            for n in &names {
657                let repo = state
658                    .repositories
659                    .get(n)
660                    .ok_or_else(|| repository_not_found(n))?;
661                out.push(repository_to_json(repo));
662            }
663        }
664        let mut response = json!({ "repositories": out });
665        if let Some(token) = next_token {
666            response["nextToken"] = json!(token);
667        }
668        Ok(AwsResponse::ok_json(response))
669    }
670
671    fn put_image_tag_mutability(
672        &self,
673        request: &AwsRequest,
674    ) -> Result<AwsResponse, AwsServiceError> {
675        let body = request.json_body();
676        let name = req_str(&body, "repositoryName")?.to_string();
677        let mutability = req_str(&body, "imageTagMutability")?.to_string();
678        if mutability != "MUTABLE" && mutability != "IMMUTABLE" {
679            return Err(invalid_parameter(format!(
680                "Invalid value for imageTagMutability: {mutability}"
681            )));
682        }
683        let account = target_account_id(request, &body);
684        let mut accounts = self.state.write();
685        let state = accounts
686            .get_mut(&account)
687            .ok_or_else(|| repository_not_found(&name))?;
688        let repo = state
689            .repositories
690            .get_mut(&name)
691            .ok_or_else(|| repository_not_found(&name))?;
692        repo.image_tag_mutability = mutability.clone();
693        let registry_id = repo.registry_id.clone();
694        Ok(AwsResponse::ok_json(json!({
695            "registryId": registry_id,
696            "repositoryName": name,
697            "imageTagMutability": mutability,
698        })))
699    }
700
701    fn put_image_scanning_configuration(
702        &self,
703        request: &AwsRequest,
704    ) -> Result<AwsResponse, AwsServiceError> {
705        let body = request.json_body();
706        let name = req_str(&body, "repositoryName")?.to_string();
707        let scan_on_push = body
708            .get("imageScanningConfiguration")
709            .and_then(|v| v.get("scanOnPush"))
710            .and_then(|v| v.as_bool())
711            .ok_or_else(|| invalid_parameter("Missing imageScanningConfiguration.scanOnPush"))?;
712        let account = target_account_id(request, &body);
713        let mut accounts = self.state.write();
714        let state = accounts
715            .get_mut(&account)
716            .ok_or_else(|| repository_not_found(&name))?;
717        let repo = state
718            .repositories
719            .get_mut(&name)
720            .ok_or_else(|| repository_not_found(&name))?;
721        repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
722        let registry_id = repo.registry_id.clone();
723        Ok(AwsResponse::ok_json(json!({
724            "registryId": registry_id,
725            "repositoryName": name,
726            "imageScanningConfiguration": { "scanOnPush": scan_on_push },
727        })))
728    }
729
730    fn set_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
731        let body = request.json_body();
732        let name = req_str(&body, "repositoryName")?.to_string();
733        let policy_text = req_str(&body, "policyText")?.to_string();
734        let account = target_account_id(request, &body);
735        let mut accounts = self.state.write();
736        let state = accounts
737            .get_mut(&account)
738            .ok_or_else(|| repository_not_found(&name))?;
739        let repo = state
740            .repositories
741            .get_mut(&name)
742            .ok_or_else(|| repository_not_found(&name))?;
743        repo.policy = Some(policy_text.clone());
744        let registry_id = repo.registry_id.clone();
745        Ok(AwsResponse::ok_json(json!({
746            "registryId": registry_id,
747            "repositoryName": name,
748            "policyText": policy_text,
749        })))
750    }
751
752    fn get_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
753        let body = request.json_body();
754        let name = req_str(&body, "repositoryName")?.to_string();
755        let account = target_account_id(request, &body);
756        let accounts = self.state.read();
757        let state = accounts
758            .get(&account)
759            .ok_or_else(|| repository_not_found(&name))?;
760        let repo = state
761            .repositories
762            .get(&name)
763            .ok_or_else(|| repository_not_found(&name))?;
764        let policy = repo
765            .policy
766            .clone()
767            .ok_or_else(|| repository_policy_not_found(&name))?;
768        Ok(AwsResponse::ok_json(json!({
769            "registryId": repo.registry_id,
770            "repositoryName": name,
771            "policyText": policy,
772        })))
773    }
774
775    fn delete_repository_policy(
776        &self,
777        request: &AwsRequest,
778    ) -> Result<AwsResponse, AwsServiceError> {
779        let body = request.json_body();
780        let name = req_str(&body, "repositoryName")?.to_string();
781        let account = target_account_id(request, &body);
782        let mut accounts = self.state.write();
783        let state = accounts
784            .get_mut(&account)
785            .ok_or_else(|| repository_not_found(&name))?;
786        let repo = state
787            .repositories
788            .get_mut(&name)
789            .ok_or_else(|| repository_not_found(&name))?;
790        let policy = repo
791            .policy
792            .take()
793            .ok_or_else(|| repository_policy_not_found(&name))?;
794        let registry_id = repo.registry_id.clone();
795        Ok(AwsResponse::ok_json(json!({
796            "registryId": registry_id,
797            "repositoryName": name,
798            "policyText": policy,
799        })))
800    }
801
802    fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
803        let body = request.json_body();
804        let arn = req_str(&body, "resourceArn")?.to_string();
805        let (arn_account, name) = decode_resource_arn(&arn)?;
806        let tags = parse_tags(&body);
807        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
808        let mut accounts = self.state.write();
809        let state = accounts
810            .get_mut(&account)
811            .ok_or_else(|| repository_not_found(&name))?;
812        let repo = state
813            .repositories
814            .get_mut(&name)
815            .ok_or_else(|| repository_not_found(&name))?;
816        for (k, v) in tags {
817            repo.tags.insert(k, v);
818        }
819        Ok(AwsResponse::ok_json(json!({})))
820    }
821
822    fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
823        let body = request.json_body();
824        let arn = req_str(&body, "resourceArn")?.to_string();
825        let (arn_account, name) = decode_resource_arn(&arn)?;
826        let keys: Vec<String> = body
827            .get("tagKeys")
828            .and_then(|v| v.as_array())
829            .map(|arr| {
830                arr.iter()
831                    .filter_map(|v| v.as_str().map(str::to_string))
832                    .collect()
833            })
834            .unwrap_or_default();
835        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
836        let mut accounts = self.state.write();
837        let state = accounts
838            .get_mut(&account)
839            .ok_or_else(|| repository_not_found(&name))?;
840        let repo = state
841            .repositories
842            .get_mut(&name)
843            .ok_or_else(|| repository_not_found(&name))?;
844        for k in keys {
845            repo.tags.remove(&k);
846        }
847        Ok(AwsResponse::ok_json(json!({})))
848    }
849
850    fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
851        let body = request.json_body();
852        let arn = req_str(&body, "resourceArn")?.to_string();
853        let (arn_account, name) = decode_resource_arn(&arn)?;
854        let account = arn_account.unwrap_or_else(|| request.account_id.clone());
855        let accounts = self.state.read();
856        let state = accounts
857            .get(&account)
858            .ok_or_else(|| repository_not_found(&name))?;
859        let repo = state
860            .repositories
861            .get(&name)
862            .ok_or_else(|| repository_not_found(&name))?;
863        let tags: Vec<Value> = repo
864            .tags
865            .iter()
866            .map(|(k, v)| json!({ "Key": k, "Value": v }))
867            .collect();
868        Ok(AwsResponse::ok_json(json!({ "tags": tags })))
869    }
870}
871
872// -------- image + layer helpers --------
873
874fn image_not_found(repo: &str, id: &Value) -> AwsServiceError {
875    AwsServiceError::aws_error(
876        StatusCode::BAD_REQUEST,
877        "ImageNotFoundException",
878        format!("The image with imageId {{{id}}} does not exist within the repository with name '{repo}'"),
879    )
880}
881
882fn layer_not_found(digest: &str, repo: &str) -> AwsServiceError {
883    AwsServiceError::aws_error(
884        StatusCode::BAD_REQUEST,
885        "LayersNotFoundException",
886        format!(
887            "The layers with layerDigests '[{digest}]' do not exist in the repository with name '{repo}'"
888        ),
889    )
890}
891
892fn upload_not_found(upload_id: &str) -> AwsServiceError {
893    AwsServiceError::aws_error(
894        StatusCode::BAD_REQUEST,
895        "UploadNotFoundException",
896        format!("The upload '{upload_id}' does not exist."),
897    )
898}
899
900fn image_already_exists(repo: &str, tag: &str) -> AwsServiceError {
901    AwsServiceError::aws_error(
902        StatusCode::BAD_REQUEST,
903        "ImageAlreadyExistsException",
904        format!(
905            "Image with tag '{tag}' in repository '{repo}' already exists with a different digest and tag mutability is set to IMMUTABLE."
906        ),
907    )
908}
909
910fn invalid_layer(message: impl Into<String>) -> AwsServiceError {
911    AwsServiceError::aws_error(StatusCode::BAD_REQUEST, "InvalidLayerException", message)
912}
913
914fn sha256_digest(bytes: &[u8]) -> String {
915    let mut hasher = Sha256::new();
916    hasher.update(bytes);
917    format!("sha256:{:x}", hasher.finalize())
918}
919
920fn image_id_for(image: &Image, tag: Option<&str>) -> Value {
921    let mut id = json!({ "imageDigest": image.image_digest });
922    if let Some(t) = tag {
923        id["imageTag"] = json!(t);
924    }
925    id
926}
927
928fn image_to_details(repo: &Repository, image: &Image, registry_id: &str) -> Value {
929    // All tags pointing at this digest.
930    let tags: Vec<&str> = repo
931        .image_tags
932        .iter()
933        .filter(|(_, d)| d.as_str() == image.image_digest)
934        .map(|(t, _)| t.as_str())
935        .collect();
936    let mut out = json!({
937        "registryId": registry_id,
938        "repositoryName": repo.repository_name,
939        "imageDigest": image.image_digest,
940        "imageTags": tags,
941        "imageSizeInBytes": image.image_size_in_bytes,
942        "imagePushedAt": image.image_pushed_at.timestamp(),
943        "imageManifestMediaType": image.image_manifest_media_type,
944    });
945    if let Some(a) = &image.artifact_media_type {
946        out["artifactMediaType"] = json!(a);
947    }
948    if let Some(t) = image.last_recorded_pull_time {
949        out["lastRecordedPullTime"] = json!(t.timestamp());
950    }
951    out
952}
953
954/// Return only the layer blobs referenced by the manifest of `image_digest`.
955/// Falls back to all layers if the manifest can't be parsed (e.g. an
956/// OCI-spec image that fakecloud stored in an unfamiliar shape) so the
957/// scan still runs against something. Layers not stored locally are
958/// silently skipped — the scanner only sees what's actually there.
959fn layers_for_image(repo: &Repository, image_digest: &str) -> Vec<crate::state::Layer> {
960    let Some(image) = repo.images.get(image_digest) else {
961        return Vec::new();
962    };
963    let Ok(manifest): Result<Value, _> = serde_json::from_str(&image.image_manifest) else {
964        return repo.layers.values().cloned().collect();
965    };
966    let mut digests: Vec<String> = Vec::new();
967    if let Some(arr) = manifest.get("layers").and_then(|v| v.as_array()) {
968        for layer in arr {
969            if let Some(d) = layer.get("digest").and_then(|v| v.as_str()) {
970                digests.push(d.to_string());
971            }
972        }
973    }
974    digests
975        .into_iter()
976        .filter_map(|d| repo.layers.get(&d).cloned())
977        .collect()
978}
979
980/// Resolve `imageId` into a stored digest for this repo. Accepts either
981/// `{imageDigest}` or `{imageTag}` (or both — digest wins when both set).
982fn resolve_image_digest(repo: &Repository, image_id: &Value) -> Option<String> {
983    if let Some(d) = image_id.get("imageDigest").and_then(|v| v.as_str()) {
984        if repo.images.contains_key(d) {
985            return Some(d.to_string());
986        }
987        return None;
988    }
989    if let Some(tag) = image_id.get("imageTag").and_then(|v| v.as_str()) {
990        return repo.image_tags.get(tag).cloned();
991    }
992    None
993}
994
995// -------- image + layer operations --------
996
997impl EcrService {
998    fn put_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
999        let body = request.json_body();
1000        let name = req_str(&body, "repositoryName")?.to_string();
1001        let manifest = req_str(&body, "imageManifest")?.to_string();
1002        let manifest_media_type = opt_str(&body, "imageManifestMediaType")
1003            .unwrap_or("application/vnd.docker.distribution.manifest.v2+json")
1004            .to_string();
1005        let supplied_tag = opt_str(&body, "imageTag").map(|s| s.to_string());
1006        let supplied_digest = opt_str(&body, "imageDigest").map(|s| s.to_string());
1007        let account = target_account_id(request, &body);
1008
1009        let computed_digest = sha256_digest(manifest.as_bytes());
1010        if let Some(ref supplied) = supplied_digest {
1011            if supplied != &computed_digest {
1012                return Err(AwsServiceError::aws_error(
1013                    StatusCode::BAD_REQUEST,
1014                    "ImageDigestDoesNotMatchException",
1015                    format!(
1016                        "The imageDigest '{supplied}' does not match the digest of the uploaded manifest ('{computed_digest}')."
1017                    ),
1018                ));
1019            }
1020        }
1021        let digest = supplied_digest.unwrap_or_else(|| computed_digest.clone());
1022
1023        let mut accounts = self.state.write();
1024        let state = accounts
1025            .get_mut(&account)
1026            .ok_or_else(|| repository_not_found(&name))?;
1027        let repo = state
1028            .repositories
1029            .get_mut(&name)
1030            .ok_or_else(|| repository_not_found(&name))?;
1031
1032        // Immutable tag guard: if a tag is supplied, it already maps to
1033        // a different digest, and the repo is IMMUTABLE, reject.
1034        if let Some(ref tag) = supplied_tag {
1035            if let Some(existing) = repo.image_tags.get(tag) {
1036                if existing != &digest && repo.image_tag_mutability == "IMMUTABLE" {
1037                    return Err(image_already_exists(&name, tag));
1038                }
1039            }
1040        }
1041
1042        let image_entry = repo.images.entry(digest.clone()).or_insert_with(|| Image {
1043            image_digest: digest.clone(),
1044            image_manifest: manifest.clone(),
1045            image_manifest_media_type: manifest_media_type.clone(),
1046            artifact_media_type: None,
1047            image_size_in_bytes: manifest.len() as u64,
1048            image_pushed_at: Utc::now(),
1049            last_recorded_pull_time: None,
1050        });
1051        // If the caller re-pushes an existing digest with a new manifest
1052        // payload (shouldn't happen under sha256 addressing but tolerate
1053        // it), keep the latest manifest.
1054        image_entry.image_manifest = manifest;
1055        image_entry.image_manifest_media_type = manifest_media_type.clone();
1056
1057        if let Some(tag) = supplied_tag.clone() {
1058            repo.image_tags.insert(tag, digest.clone());
1059        }
1060
1061        let snapshot = repo.images.get(&digest).cloned().unwrap();
1062        let tag_ref = supplied_tag.as_deref();
1063        Ok(AwsResponse::ok_json(json!({
1064            "image": {
1065                "registryId": repo.registry_id,
1066                "repositoryName": name,
1067                "imageId": image_id_for(&snapshot, tag_ref),
1068                "imageManifest": snapshot.image_manifest,
1069                "imageManifestMediaType": snapshot.image_manifest_media_type,
1070            }
1071        })))
1072    }
1073
1074    fn batch_get_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1075        let body = request.json_body();
1076        let name = req_str(&body, "repositoryName")?.to_string();
1077        let ids = body
1078            .get("imageIds")
1079            .and_then(|v| v.as_array())
1080            .cloned()
1081            .unwrap_or_default();
1082        let account = target_account_id(request, &body);
1083        let accounts = self.state.read();
1084        let state = accounts
1085            .get(&account)
1086            .ok_or_else(|| repository_not_found(&name))?;
1087        let repo = state
1088            .repositories
1089            .get(&name)
1090            .ok_or_else(|| repository_not_found(&name))?;
1091
1092        let mut images: Vec<Value> = Vec::new();
1093        let mut failures: Vec<Value> = Vec::new();
1094        for id in &ids {
1095            match resolve_image_digest(repo, id) {
1096                Some(digest) => {
1097                    let img = repo.images.get(&digest).unwrap();
1098                    let tag = id.get("imageTag").and_then(|v| v.as_str());
1099                    images.push(json!({
1100                        "registryId": repo.registry_id,
1101                        "repositoryName": name,
1102                        "imageId": image_id_for(img, tag),
1103                        "imageManifest": img.image_manifest,
1104                        "imageManifestMediaType": img.image_manifest_media_type,
1105                    }));
1106                }
1107                None => failures.push(json!({
1108                    "imageId": id,
1109                    "failureCode": "ImageNotFound",
1110                    "failureReason": "Requested image not found",
1111                })),
1112            }
1113        }
1114        Ok(AwsResponse::ok_json(json!({
1115            "images": images,
1116            "failures": failures,
1117        })))
1118    }
1119
1120    fn batch_delete_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1121        let body = request.json_body();
1122        let name = req_str(&body, "repositoryName")?.to_string();
1123        let ids = body
1124            .get("imageIds")
1125            .and_then(|v| v.as_array())
1126            .cloned()
1127            .unwrap_or_default();
1128        let account = target_account_id(request, &body);
1129        let mut accounts = self.state.write();
1130        let state = accounts
1131            .get_mut(&account)
1132            .ok_or_else(|| repository_not_found(&name))?;
1133        let repo = state
1134            .repositories
1135            .get_mut(&name)
1136            .ok_or_else(|| repository_not_found(&name))?;
1137
1138        let mut deleted: Vec<Value> = Vec::new();
1139        let mut failures: Vec<Value> = Vec::new();
1140        for id in &ids {
1141            if let Some(tag) = id.get("imageTag").and_then(|v| v.as_str()) {
1142                // Delete by tag: remove only the tag, image stays if
1143                // other tags still reference the digest.
1144                if let Some(digest) = repo.image_tags.remove(tag) {
1145                    deleted.push(json!({ "imageDigest": digest, "imageTag": tag }));
1146                    let still_tagged = repo.image_tags.values().any(|d| *d == digest);
1147                    if !still_tagged {
1148                        repo.images.remove(&digest);
1149                    }
1150                    continue;
1151                }
1152                failures.push(json!({
1153                    "imageId": id,
1154                    "failureCode": "ImageNotFound",
1155                    "failureReason": "Requested image not found",
1156                }));
1157            } else if let Some(digest) = id.get("imageDigest").and_then(|v| v.as_str()) {
1158                if repo.images.remove(digest).is_some() {
1159                    repo.image_tags.retain(|_, d| d != digest);
1160                    deleted.push(json!({ "imageDigest": digest }));
1161                    continue;
1162                }
1163                failures.push(json!({
1164                    "imageId": id,
1165                    "failureCode": "ImageNotFound",
1166                    "failureReason": "Requested image not found",
1167                }));
1168            } else {
1169                failures.push(json!({
1170                    "imageId": id,
1171                    "failureCode": "InvalidImageTag",
1172                    "failureReason": "Either imageDigest or imageTag must be supplied",
1173                }));
1174            }
1175        }
1176        Ok(AwsResponse::ok_json(json!({
1177            "imageIds": deleted,
1178            "failures": failures,
1179        })))
1180    }
1181
1182    fn batch_check_layer_availability(
1183        &self,
1184        request: &AwsRequest,
1185    ) -> Result<AwsResponse, AwsServiceError> {
1186        let body = request.json_body();
1187        let name = req_str(&body, "repositoryName")?.to_string();
1188        let digests: Vec<String> = body
1189            .get("layerDigests")
1190            .and_then(|v| v.as_array())
1191            .map(|arr| {
1192                arr.iter()
1193                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1194                    .collect()
1195            })
1196            .unwrap_or_default();
1197        if digests.is_empty() {
1198            return Err(invalid_parameter(
1199                "At least one layerDigest must be supplied to BatchCheckLayerAvailability",
1200            ));
1201        }
1202        let account = target_account_id(request, &body);
1203        let accounts = self.state.read();
1204        let state = accounts
1205            .get(&account)
1206            .ok_or_else(|| repository_not_found(&name))?;
1207        let repo = state
1208            .repositories
1209            .get(&name)
1210            .ok_or_else(|| repository_not_found(&name))?;
1211        let mut layers: Vec<Value> = Vec::new();
1212        let mut failures: Vec<Value> = Vec::new();
1213        for digest in &digests {
1214            match repo.layers.get(digest) {
1215                Some(layer) => layers.push(json!({
1216                    "layerDigest": layer.digest,
1217                    "layerAvailability": "AVAILABLE",
1218                    "layerSize": layer.size,
1219                    "mediaType": layer.media_type,
1220                })),
1221                None => failures.push(json!({
1222                    "layerDigest": digest,
1223                    "failureCode": "MissingLayerDigest",
1224                    "failureReason": "Layer not found in repository",
1225                })),
1226            }
1227        }
1228        Ok(AwsResponse::ok_json(json!({
1229            "layers": layers,
1230            "failures": failures,
1231        })))
1232    }
1233
1234    fn describe_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1235        const DEFAULT_PAGE_SIZE: usize = 100;
1236        let body = request.json_body();
1237        let name = req_str(&body, "repositoryName")?.to_string();
1238        let ids = body
1239            .get("imageIds")
1240            .and_then(|v| v.as_array())
1241            .cloned()
1242            .unwrap_or_default();
1243        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1244            Some(n) => {
1245                if !(1..=1000).contains(&n) {
1246                    return Err(invalid_parameter(format!(
1247                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1248                         Member must have value between 1 and 1000",
1249                    )));
1250                }
1251                n as usize
1252            }
1253            None => DEFAULT_PAGE_SIZE,
1254        };
1255        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1256            Some(raw) => raw.parse::<usize>().map_err(|_| {
1257                AwsServiceError::aws_error(
1258                    StatusCode::BAD_REQUEST,
1259                    "InvalidContinuationTokenException",
1260                    "The specified continuation token is not valid.",
1261                )
1262            })?,
1263            None => 0,
1264        };
1265        let account = target_account_id(request, &body);
1266        let accounts = self.state.read();
1267        let state = accounts
1268            .get(&account)
1269            .ok_or_else(|| repository_not_found(&name))?;
1270        let repo = state
1271            .repositories
1272            .get(&name)
1273            .ok_or_else(|| repository_not_found(&name))?;
1274
1275        let mut details: Vec<Value> = Vec::new();
1276        let mut next_token: Option<String> = None;
1277        if ids.is_empty() {
1278            let all: Vec<&Image> = repo.images.values().collect();
1279            let start = offset.min(all.len());
1280            let end = (start + max_results).min(all.len());
1281            for img in &all[start..end] {
1282                details.push(image_to_details(repo, img, &repo.registry_id));
1283            }
1284            if end < all.len() {
1285                next_token = Some(end.to_string());
1286            }
1287        } else {
1288            for id in &ids {
1289                let digest =
1290                    resolve_image_digest(repo, id).ok_or_else(|| image_not_found(&name, id))?;
1291                let img = repo.images.get(&digest).unwrap();
1292                details.push(image_to_details(repo, img, &repo.registry_id));
1293            }
1294        }
1295        let mut response = json!({ "imageDetails": details });
1296        if let Some(token) = next_token {
1297            response["nextToken"] = json!(token);
1298        }
1299        Ok(AwsResponse::ok_json(response))
1300    }
1301
1302    fn list_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1303        const DEFAULT_PAGE_SIZE: usize = 100;
1304        let body = request.json_body();
1305        let name = req_str(&body, "repositoryName")?.to_string();
1306        let filter_tag_status = body
1307            .get("filter")
1308            .and_then(|v| v.get("tagStatus"))
1309            .and_then(|v| v.as_str())
1310            .map(|s| s.to_string());
1311        let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1312            Some(n) => {
1313                if !(1..=1000).contains(&n) {
1314                    return Err(invalid_parameter(format!(
1315                        "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1316                         Member must have value between 1 and 1000",
1317                    )));
1318                }
1319                n as usize
1320            }
1321            None => DEFAULT_PAGE_SIZE,
1322        };
1323        let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1324            Some(raw) => raw.parse::<usize>().map_err(|_| {
1325                AwsServiceError::aws_error(
1326                    StatusCode::BAD_REQUEST,
1327                    "InvalidContinuationTokenException",
1328                    "The specified continuation token is not valid.",
1329                )
1330            })?,
1331            None => 0,
1332        };
1333        let account = target_account_id(request, &body);
1334        let accounts = self.state.read();
1335        let state = accounts
1336            .get(&account)
1337            .ok_or_else(|| repository_not_found(&name))?;
1338        let repo = state
1339            .repositories
1340            .get(&name)
1341            .ok_or_else(|| repository_not_found(&name))?;
1342
1343        // Enumerate once per (digest, tag-or-untagged) combination.
1344        let mut all: Vec<(String, Option<String>)> = Vec::new();
1345        for (tag, digest) in &repo.image_tags {
1346            all.push((digest.clone(), Some(tag.clone())));
1347        }
1348        let tagged_digests: std::collections::HashSet<&String> = repo.image_tags.values().collect();
1349        for digest in repo.images.keys() {
1350            if !tagged_digests.contains(digest) {
1351                all.push((digest.clone(), None));
1352            }
1353        }
1354        // Apply filter.
1355        all.retain(|(_, tag)| match filter_tag_status.as_deref() {
1356            Some("TAGGED") => tag.is_some(),
1357            Some("UNTAGGED") => tag.is_none(),
1358            _ => true,
1359        });
1360        all.sort();
1361
1362        let start = offset.min(all.len());
1363        let end = (start + max_results).min(all.len());
1364        let ids: Vec<Value> = all[start..end]
1365            .iter()
1366            .map(|(d, t)| {
1367                let mut v = json!({ "imageDigest": d });
1368                if let Some(tag) = t {
1369                    v["imageTag"] = json!(tag);
1370                }
1371                v
1372            })
1373            .collect();
1374        let mut response = json!({ "imageIds": ids });
1375        if end < all.len() {
1376            response["nextToken"] = json!(end.to_string());
1377        }
1378        Ok(AwsResponse::ok_json(response))
1379    }
1380
1381    fn get_download_url_for_layer(
1382        &self,
1383        request: &AwsRequest,
1384    ) -> Result<AwsResponse, AwsServiceError> {
1385        let body = request.json_body();
1386        let name = req_str(&body, "repositoryName")?.to_string();
1387        let digest = req_str(&body, "layerDigest")?.to_string();
1388        let account = target_account_id(request, &body);
1389        let accounts = self.state.read();
1390        let state = accounts
1391            .get(&account)
1392            .ok_or_else(|| repository_not_found(&name))?;
1393        let repo = state
1394            .repositories
1395            .get(&name)
1396            .ok_or_else(|| repository_not_found(&name))?;
1397        if !repo.layers.contains_key(&digest) {
1398            return Err(layer_not_found(&digest, &name));
1399        }
1400        // Batch 3 will host `/v2/<name>/blobs/<digest>` — return that
1401        // path as a relative URL so callers that trust the endpoint
1402        // they're already talking to can resolve it.
1403        let endpoint = accounts.endpoint();
1404        let url = format!(
1405            "{}/v2/{}/blobs/{}",
1406            endpoint.trim_end_matches('/'),
1407            name,
1408            digest
1409        );
1410        Ok(AwsResponse::ok_json(json!({
1411            "downloadUrl": url,
1412            "layerDigest": digest,
1413        })))
1414    }
1415
1416    fn initiate_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1417        let body = request.json_body();
1418        let name = req_str(&body, "repositoryName")?.to_string();
1419        let account = target_account_id(request, &body);
1420        let mut accounts = self.state.write();
1421        let state = accounts
1422            .get_mut(&account)
1423            .ok_or_else(|| repository_not_found(&name))?;
1424        if !state.repositories.contains_key(&name) {
1425            return Err(repository_not_found(&name));
1426        }
1427        let upload_id = Uuid::new_v4().to_string();
1428        let spool = crate::oci::create_upload_spool(&upload_id).map_err(|e| {
1429            AwsServiceError::aws_error(
1430                StatusCode::INTERNAL_SERVER_ERROR,
1431                "InternalError",
1432                format!("failed to create upload spool: {e}"),
1433            )
1434        })?;
1435        state.layer_uploads.insert(
1436            upload_id.clone(),
1437            LayerUpload {
1438                upload_id: upload_id.clone(),
1439                repository_name: name,
1440                created_at: Utc::now(),
1441                spool_path: spool.to_string_lossy().to_string(),
1442                last_byte_received: 0,
1443            },
1444        );
1445        Ok(AwsResponse::ok_json(json!({
1446            "uploadId": upload_id,
1447            // Matches the real AWS default of 10 MiB.
1448            "partSize": 10_485_760u64,
1449        })))
1450    }
1451
1452    fn upload_layer_part(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1453        let body = request.json_body();
1454        let name = req_str(&body, "repositoryName")?.to_string();
1455        let upload_id = req_str(&body, "uploadId")?.to_string();
1456        let first_byte = body
1457            .get("partFirstByte")
1458            .and_then(|v| v.as_u64())
1459            .ok_or_else(|| invalid_parameter("Missing partFirstByte"))?;
1460        let last_byte = body
1461            .get("partLastByte")
1462            .and_then(|v| v.as_u64())
1463            .ok_or_else(|| invalid_parameter("Missing partLastByte"))?;
1464        let part_blob_b64 = req_str(&body, "layerPartBlob")?.to_string();
1465        let part_bytes = B64
1466            .decode(part_blob_b64.as_bytes())
1467            .map_err(|_| invalid_layer("layerPartBlob is not valid base64"))?;
1468        let account = target_account_id(request, &body);
1469        let mut accounts = self.state.write();
1470        let state = accounts
1471            .get_mut(&account)
1472            .ok_or_else(|| repository_not_found(&name))?;
1473        let upload = state
1474            .layer_uploads
1475            .get_mut(&upload_id)
1476            .ok_or_else(|| upload_not_found(&upload_id))?;
1477        if upload.repository_name != name {
1478            return Err(upload_not_found(&upload_id));
1479        }
1480        if first_byte != upload.last_byte_received {
1481            return Err(invalid_layer(format!(
1482                "Layer part upload out of order: expected partFirstByte {} got {}",
1483                upload.last_byte_received, first_byte,
1484            )));
1485        }
1486        let expected_len = last_byte
1487            .checked_sub(first_byte)
1488            .and_then(|d| d.checked_add(1))
1489            .ok_or_else(|| invalid_layer("partLastByte < partFirstByte"))?;
1490        if part_bytes.len() as u64 != expected_len {
1491            return Err(invalid_layer(format!(
1492                "Layer part size mismatch: bytes {} doesn't match range [{first_byte}, {last_byte}]",
1493                part_bytes.len()
1494            )));
1495        }
1496        let spool = std::path::PathBuf::from(&upload.spool_path);
1497        crate::oci::append_bytes_sync(&spool, &part_bytes).map_err(|e| {
1498            AwsServiceError::aws_error(
1499                StatusCode::INTERNAL_SERVER_ERROR,
1500                "InternalError",
1501                format!("failed to append upload chunk: {e}"),
1502            )
1503        })?;
1504        upload.last_byte_received = last_byte + 1;
1505        Ok(AwsResponse::ok_json(json!({
1506            "registryId": state.registry_id(),
1507            "repositoryName": name,
1508            "uploadId": upload_id,
1509            "lastByteReceived": last_byte,
1510        })))
1511    }
1512
1513    fn get_authorization_token(
1514        &self,
1515        request: &AwsRequest,
1516    ) -> Result<AwsResponse, AwsServiceError> {
1517        let body = request.json_body();
1518        let registry_ids: Vec<String> = body
1519            .get("registryIds")
1520            .and_then(|v| v.as_array())
1521            .map(|arr| {
1522                arr.iter()
1523                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1524                    .collect()
1525            })
1526            .unwrap_or_default();
1527        let accounts = self.state.read();
1528        let default_account = accounts.default_account_id().to_string();
1529        let targets = if registry_ids.is_empty() {
1530            vec![default_account]
1531        } else {
1532            registry_ids
1533        };
1534        let endpoint = accounts.endpoint().to_string();
1535        drop(accounts);
1536        let expires_at = (Utc::now() + chrono::Duration::hours(12)).timestamp();
1537        let authorization_data: Vec<Value> = targets
1538            .into_iter()
1539            .map(|_registry_id| {
1540                let token = B64.encode(format!("AWS:{}", Uuid::new_v4()).as_bytes());
1541                json!({
1542                    "authorizationToken": token,
1543                    "expiresAt": expires_at,
1544                    "proxyEndpoint": endpoint,
1545                })
1546            })
1547            .collect();
1548        Ok(AwsResponse::ok_json(json!({
1549            "authorizationData": authorization_data,
1550        })))
1551    }
1552
1553    fn complete_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1554        let body = request.json_body();
1555        let name = req_str(&body, "repositoryName")?.to_string();
1556        let upload_id = req_str(&body, "uploadId")?.to_string();
1557        let digests: Vec<String> = body
1558            .get("layerDigests")
1559            .and_then(|v| v.as_array())
1560            .map(|arr| {
1561                arr.iter()
1562                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
1563                    .collect()
1564            })
1565            .unwrap_or_default();
1566        if digests.is_empty() {
1567            return Err(invalid_parameter(
1568                "At least one layerDigest must be supplied to CompleteLayerUpload",
1569            ));
1570        }
1571        let account = target_account_id(request, &body);
1572        let mut accounts = self.state.write();
1573        let state = accounts
1574            .get_mut(&account)
1575            .ok_or_else(|| repository_not_found(&name))?;
1576        // Peek, validate, then commit — so a digest mismatch lets the
1577        // caller retry CompleteLayerUpload with the correct digest
1578        // instead of having to re-upload the entire blob.
1579        let upload = state
1580            .layer_uploads
1581            .get(&upload_id)
1582            .ok_or_else(|| upload_not_found(&upload_id))?;
1583        if upload.repository_name != name {
1584            return Err(upload_not_found(&upload_id));
1585        }
1586        let spool = std::path::PathBuf::from(&upload.spool_path);
1587        let blob_bytes = crate::oci::read_spool(&spool).map_err(|e| {
1588            AwsServiceError::aws_error(
1589                StatusCode::INTERNAL_SERVER_ERROR,
1590                "InternalError",
1591                format!("failed to read upload spool: {e}"),
1592            )
1593        })?;
1594        let computed = sha256_digest(&blob_bytes);
1595        if !digests.iter().any(|d| d == &computed) {
1596            // Spool stays — caller can retry with the correct digest
1597            // without re-uploading every UploadLayerPart chunk.
1598            return Err(AwsServiceError::aws_error(
1599                StatusCode::BAD_REQUEST,
1600                "LayerDigestMismatchException",
1601                format!(
1602                    "The layer digest from the client ({}) does not match the digest of the received bytes ({computed})",
1603                    digests.join(",")
1604                ),
1605            ));
1606        }
1607        let _upload = state.layer_uploads.remove(&upload_id).unwrap();
1608        crate::oci::unlink_spool(&spool);
1609        let size = blob_bytes.len() as u64;
1610        // Drop the write guard before the KMS encrypt call (which takes
1611        // its own lock). Re-acquire to insert.
1612        drop(accounts);
1613        let (stored_bytes, encrypted_with) =
1614            crate::oci::encrypt_layer_bytes(self, &account, &name, &blob_bytes);
1615        let mut accounts = self.state.write();
1616        let state = accounts
1617            .get_mut(&account)
1618            .ok_or_else(|| repository_not_found(&name))?;
1619        let repo = state
1620            .repositories
1621            .get_mut(&name)
1622            .ok_or_else(|| repository_not_found(&name))?;
1623        repo.layers.insert(
1624            computed.clone(),
1625            Layer {
1626                digest: computed.clone(),
1627                size,
1628                blob_b64: B64.encode(&stored_bytes),
1629                media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
1630                encrypted_with_kms_key: encrypted_with,
1631            },
1632        );
1633        let registry_id = repo.registry_id.clone();
1634        Ok(AwsResponse::ok_json(json!({
1635            "registryId": registry_id,
1636            "repositoryName": name,
1637            "uploadId": upload_id,
1638            "layerDigest": computed,
1639        })))
1640    }
1641}
1642
1643// -------- lifecycle + scan + registry + polish handlers (Batch 4) --------
1644
1645fn lifecycle_policy_not_found(name: &str) -> AwsServiceError {
1646    AwsServiceError::aws_error(
1647        StatusCode::BAD_REQUEST,
1648        "LifecyclePolicyNotFoundException",
1649        format!("Lifecycle policy does not exist for the repository with name '{name}'."),
1650    )
1651}
1652
1653fn registry_policy_not_found() -> AwsServiceError {
1654    AwsServiceError::aws_error(
1655        StatusCode::BAD_REQUEST,
1656        "RegistryPolicyNotFoundException",
1657        "The registry doesn't have an associated registry policy.",
1658    )
1659}
1660
1661/// Apply lifecycle-policy rules to this repo's stored images and
1662/// return the digests that should be pruned. Covers the four AWS
1663/// selection dimensions in use today: `tagStatus` (tagged/untagged/any),
1664/// `tagPrefixList`, `tagPatternList` (wildcard `*`), and `countType`
1665/// (`imageCountMoreThan` or `sinceImagePushed` with `countUnit=days`).
1666/// Rules run in ascending `rulePriority` order; later rules can't
1667/// re-prune images earlier rules already marked.
1668fn evaluate_lifecycle_policy(repo: &crate::state::Repository, policy: &str) -> Vec<String> {
1669    let Ok(doc) = serde_json::from_str::<Value>(policy) else {
1670        return Vec::new();
1671    };
1672    let Some(rules) = doc.get("rules").and_then(|v| v.as_array()) else {
1673        return Vec::new();
1674    };
1675    let mut to_delete: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
1676    // Sort rules by priority ascending (lower priority runs first
1677    // per AWS semantics).
1678    let mut sorted: Vec<&Value> = rules.iter().collect();
1679    sorted.sort_by_key(|r| r.get("rulePriority").and_then(|v| v.as_i64()).unwrap_or(0));
1680    for rule in sorted {
1681        let sel = rule.get("selection").cloned().unwrap_or(Value::Null);
1682        let tag_status = sel
1683            .get("tagStatus")
1684            .and_then(|v| v.as_str())
1685            .unwrap_or("any");
1686        let count_type = sel.get("countType").and_then(|v| v.as_str()).unwrap_or("");
1687        let count_number = sel.get("countNumber").and_then(|v| v.as_i64()).unwrap_or(0);
1688        let count_unit = sel
1689            .get("countUnit")
1690            .and_then(|v| v.as_str())
1691            .unwrap_or("days");
1692        let tag_prefix_list: Vec<String> = sel
1693            .get("tagPrefixList")
1694            .and_then(|v| v.as_array())
1695            .map(|arr| {
1696                arr.iter()
1697                    .filter_map(|v| v.as_str().map(String::from))
1698                    .collect()
1699            })
1700            .unwrap_or_default();
1701        let tag_pattern_list: Vec<String> = sel
1702            .get("tagPatternList")
1703            .and_then(|v| v.as_array())
1704            .map(|arr| {
1705                arr.iter()
1706                    .filter_map(|v| v.as_str().map(String::from))
1707                    .collect()
1708            })
1709            .unwrap_or_default();
1710
1711        // Per-image tag lookup: repo stores a tag -> digest map; invert
1712        // so we can ask "what tags point at this digest".
1713        let tags_for = |digest: &str| -> Vec<&str> {
1714            repo.image_tags
1715                .iter()
1716                .filter_map(|(t, d)| (d == digest).then_some(t.as_str()))
1717                .collect()
1718        };
1719
1720        // Candidate images, filtered by tagStatus + tagPrefixList +
1721        // tagPatternList. Per AWS, the tag filters only apply when
1722        // tagStatus=tagged.
1723        let mut candidates: Vec<&Image> = repo
1724            .images
1725            .values()
1726            .filter(|img| {
1727                let tags = tags_for(&img.image_digest);
1728                let has_tag = !tags.is_empty();
1729                match tag_status {
1730                    "tagged" => {
1731                        if !has_tag {
1732                            return false;
1733                        }
1734                        if !tag_prefix_list.is_empty()
1735                            && !tags
1736                                .iter()
1737                                .any(|t| tag_prefix_list.iter().any(|p| t.starts_with(p.as_str())))
1738                        {
1739                            return false;
1740                        }
1741                        if !tag_pattern_list.is_empty()
1742                            && !tags.iter().any(|t| {
1743                                tag_pattern_list
1744                                    .iter()
1745                                    .any(|p| wildcard_match(p.as_str(), t))
1746                            })
1747                        {
1748                            return false;
1749                        }
1750                        true
1751                    }
1752                    "untagged" => !has_tag,
1753                    _ => true,
1754                }
1755            })
1756            .filter(|img| !to_delete.contains(&img.image_digest))
1757            .collect();
1758        candidates.sort_by_key(|img| img.image_pushed_at);
1759        match count_type {
1760            "imageCountMoreThan" => {
1761                // Keep the newest N, prune the rest.
1762                let total = candidates.len() as i64;
1763                if total > count_number {
1764                    let prune_count = (total - count_number) as usize;
1765                    for img in candidates.into_iter().take(prune_count) {
1766                        to_delete.insert(img.image_digest.clone());
1767                    }
1768                }
1769            }
1770            "sinceImagePushed" => {
1771                let now = chrono::Utc::now();
1772                let delta = match count_unit {
1773                    "days" => chrono::Duration::days(count_number),
1774                    "hours" => chrono::Duration::hours(count_number),
1775                    _ => chrono::Duration::days(count_number),
1776                };
1777                let threshold = now - delta;
1778                for img in candidates {
1779                    if img.image_pushed_at < threshold {
1780                        to_delete.insert(img.image_digest.clone());
1781                    }
1782                }
1783            }
1784            _ => {}
1785        }
1786    }
1787    to_delete.into_iter().collect()
1788}
1789
1790/// AWS lifecycle `tagPatternList` supports `*` as a shell-style
1791/// wildcard. No regex metacharacters beyond `*`, no anchoring beyond
1792/// full-string match.
1793fn wildcard_match(pattern: &str, text: &str) -> bool {
1794    let parts: Vec<&str> = pattern.split('*').collect();
1795    if parts.len() == 1 {
1796        return parts[0] == text;
1797    }
1798    let mut rest = text;
1799    // Leading literal must match start if the pattern doesn't start
1800    // with a `*`.
1801    if let Some(first) = parts.first() {
1802        if !first.is_empty() {
1803            if !rest.starts_with(first) {
1804                return false;
1805            }
1806            rest = &rest[first.len()..];
1807        }
1808    }
1809    // Trailing literal must match end if the pattern doesn't end
1810    // with a `*`.
1811    let last_idx = parts.len() - 1;
1812    for (i, seg) in parts.iter().enumerate().skip(1) {
1813        if seg.is_empty() {
1814            continue;
1815        }
1816        if i == last_idx {
1817            if !rest.ends_with(seg) {
1818                return false;
1819            }
1820            rest = &rest[..rest.len() - seg.len()];
1821        } else if let Some(pos) = rest.find(seg) {
1822            rest = &rest[pos + seg.len()..];
1823        } else {
1824            return false;
1825        }
1826    }
1827    true
1828}
1829
1830impl EcrService {
1831    fn put_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1832        let body = request.json_body();
1833        let name = req_str(&body, "repositoryName")?.to_string();
1834        let policy = req_str(&body, "lifecyclePolicyText")?.to_string();
1835        // Parse sanity-check.
1836        serde_json::from_str::<Value>(&policy)
1837            .map_err(|_| invalid_parameter("lifecyclePolicyText is not valid JSON"))?;
1838        let account = target_account_id(request, &body);
1839        let mut accounts = self.state.write();
1840        let state = accounts
1841            .get_mut(&account)
1842            .ok_or_else(|| repository_not_found(&name))?;
1843        let repo = state
1844            .repositories
1845            .get_mut(&name)
1846            .ok_or_else(|| repository_not_found(&name))?;
1847        repo.lifecycle_policy = Some(policy.clone());
1848        // Apply immediately so the store reflects the policy.
1849        let prune = evaluate_lifecycle_policy(repo, &policy);
1850        for digest in &prune {
1851            repo.images.remove(digest);
1852            repo.image_tags.retain(|_, d| d != digest);
1853        }
1854        let registry_id = repo.registry_id.clone();
1855        Ok(AwsResponse::ok_json(json!({
1856            "registryId": registry_id,
1857            "repositoryName": name,
1858            "lifecyclePolicyText": policy,
1859        })))
1860    }
1861
1862    fn get_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1863        let body = request.json_body();
1864        let name = req_str(&body, "repositoryName")?.to_string();
1865        let account = target_account_id(request, &body);
1866        let accounts = self.state.read();
1867        let state = accounts
1868            .get(&account)
1869            .ok_or_else(|| repository_not_found(&name))?;
1870        let repo = state
1871            .repositories
1872            .get(&name)
1873            .ok_or_else(|| repository_not_found(&name))?;
1874        let policy = repo
1875            .lifecycle_policy
1876            .clone()
1877            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1878        Ok(AwsResponse::ok_json(json!({
1879            "registryId": repo.registry_id,
1880            "repositoryName": name,
1881            "lifecyclePolicyText": policy,
1882            "lastEvaluatedAt": Utc::now().timestamp(),
1883        })))
1884    }
1885
1886    fn delete_lifecycle_policy(
1887        &self,
1888        request: &AwsRequest,
1889    ) -> Result<AwsResponse, AwsServiceError> {
1890        let body = request.json_body();
1891        let name = req_str(&body, "repositoryName")?.to_string();
1892        let account = target_account_id(request, &body);
1893        let mut accounts = self.state.write();
1894        let state = accounts
1895            .get_mut(&account)
1896            .ok_or_else(|| repository_not_found(&name))?;
1897        let repo = state
1898            .repositories
1899            .get_mut(&name)
1900            .ok_or_else(|| repository_not_found(&name))?;
1901        let policy = repo
1902            .lifecycle_policy
1903            .take()
1904            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1905        let registry_id = repo.registry_id.clone();
1906        Ok(AwsResponse::ok_json(json!({
1907            "registryId": registry_id,
1908            "repositoryName": name,
1909            "lifecyclePolicyText": policy,
1910            "lastEvaluatedAt": Utc::now().timestamp(),
1911        })))
1912    }
1913
1914    fn start_lifecycle_policy_preview(
1915        &self,
1916        request: &AwsRequest,
1917    ) -> Result<AwsResponse, AwsServiceError> {
1918        let body = request.json_body();
1919        let name = req_str(&body, "repositoryName")?.to_string();
1920        let account = target_account_id(request, &body);
1921        let policy = match opt_str(&body, "lifecyclePolicyText") {
1922            Some(s) => s.to_string(),
1923            None => {
1924                let accounts = self.state.read();
1925                let state = accounts
1926                    .get(&account)
1927                    .ok_or_else(|| repository_not_found(&name))?;
1928                let repo = state
1929                    .repositories
1930                    .get(&name)
1931                    .ok_or_else(|| repository_not_found(&name))?;
1932                repo.lifecycle_policy
1933                    .clone()
1934                    .ok_or_else(|| lifecycle_policy_not_found(&name))?
1935            }
1936        };
1937        let accounts = self.state.read();
1938        let state = accounts
1939            .get(&account)
1940            .ok_or_else(|| repository_not_found(&name))?;
1941        let repo = state
1942            .repositories
1943            .get(&name)
1944            .ok_or_else(|| repository_not_found(&name))?;
1945        let _prune = evaluate_lifecycle_policy(repo, &policy);
1946        Ok(AwsResponse::ok_json(json!({
1947            "registryId": repo.registry_id,
1948            "repositoryName": name,
1949            "lifecyclePolicyText": policy,
1950            "status": "COMPLETE",
1951        })))
1952    }
1953
1954    fn get_lifecycle_policy_preview(
1955        &self,
1956        request: &AwsRequest,
1957    ) -> Result<AwsResponse, AwsServiceError> {
1958        let body = request.json_body();
1959        let name = req_str(&body, "repositoryName")?.to_string();
1960        let account = target_account_id(request, &body);
1961        let accounts = self.state.read();
1962        let state = accounts
1963            .get(&account)
1964            .ok_or_else(|| repository_not_found(&name))?;
1965        let repo = state
1966            .repositories
1967            .get(&name)
1968            .ok_or_else(|| repository_not_found(&name))?;
1969        let policy = repo
1970            .lifecycle_policy
1971            .clone()
1972            .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1973        let prune = evaluate_lifecycle_policy(repo, &policy);
1974        let results: Vec<Value> = prune
1975            .iter()
1976            .map(|digest| {
1977                json!({
1978                    "imageDigest": digest,
1979                    "imagePushedAt": repo.images.get(digest).map(|i| i.image_pushed_at.timestamp()).unwrap_or(0),
1980                    "action": {"type": "EXPIRE"},
1981                })
1982            })
1983            .collect();
1984        Ok(AwsResponse::ok_json(json!({
1985            "registryId": repo.registry_id,
1986            "repositoryName": name,
1987            "lifecyclePolicyText": policy,
1988            "status": "COMPLETE",
1989            "previewResults": results,
1990            "summary": {"expiringImageTotalCount": prune.len()},
1991        })))
1992    }
1993
1994    fn start_image_scan(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1995        use crate::state::ImageScanFindings;
1996        let body = request.json_body();
1997        let name = req_str(&body, "repositoryName")?.to_string();
1998        let image_id = body
1999            .get("imageId")
2000            .cloned()
2001            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2002        let account = target_account_id(request, &body);
2003        let (digest, layers, registry_id) = {
2004            let mut accounts = self.state.write();
2005            let state = accounts
2006                .get_mut(&account)
2007                .ok_or_else(|| repository_not_found(&name))?;
2008            let repo = state
2009                .repositories
2010                .get_mut(&name)
2011                .ok_or_else(|| repository_not_found(&name))?;
2012            let digest = resolve_image_digest(repo, &image_id)
2013                .ok_or_else(|| image_not_found(&name, &image_id))?;
2014            // Mark scan IN_PROGRESS; real findings written by the
2015            // background scanner task. Caller polls
2016            // DescribeImageScanFindings to observe completion.
2017            repo.scan_findings.insert(
2018                digest.clone(),
2019                ImageScanFindings {
2020                    image_digest: digest.clone(),
2021                    scan_status: "IN_PROGRESS".to_string(),
2022                    scan_completed_at: None,
2023                    vulnerability_source_updated_at: None,
2024                    finding_severity_counts: BTreeMap::new(),
2025                    findings: Vec::new(),
2026                },
2027            );
2028            // Scope the scan to layers actually referenced by *this*
2029            // image's manifest. Other images sitting in the same repo
2030            // (different tag, different digest) must not contaminate
2031            // the findings — Trivy would otherwise report CVEs from
2032            // unrelated images.
2033            let layers = layers_for_image(repo, &digest);
2034            (digest, layers, repo.registry_id.clone())
2035        };
2036
2037        let shared = self.state.clone();
2038        let store = self.snapshot_store.clone();
2039        let snap_lock = self.snapshot_lock.clone();
2040        let account_for_task = account.clone();
2041        let name_for_task = name.clone();
2042        let digest_for_task = digest.clone();
2043        tokio::spawn(async move {
2044            let result = crate::scanner::scan_layers(&digest_for_task, &layers).await;
2045            {
2046                let mut accounts = shared.write();
2047                let Some(state) = accounts.get_mut(&account_for_task) else {
2048                    return;
2049                };
2050                let Some(repo) = state.repositories.get_mut(&name_for_task) else {
2051                    return;
2052                };
2053                let findings = result.unwrap_or_else(|| ImageScanFindings {
2054                    image_digest: digest_for_task.clone(),
2055                    scan_status: "COMPLETE".to_string(),
2056                    scan_completed_at: Some(Utc::now()),
2057                    vulnerability_source_updated_at: Some(Utc::now()),
2058                    finding_severity_counts: BTreeMap::new(),
2059                    findings: Vec::new(),
2060                });
2061                repo.scan_findings.insert(digest_for_task.clone(), findings);
2062            }
2063            // Persist the COMPLETE state — without this, restarts
2064            // restore the snapshot taken at IN_PROGRESS time and the
2065            // scan appears stuck forever.
2066            EcrService::save_snapshot_with(shared, store, snap_lock).await;
2067        });
2068
2069        Ok(AwsResponse::ok_json(json!({
2070            "registryId": registry_id,
2071            "repositoryName": name,
2072            "imageId": image_id,
2073            "imageScanStatus": {"status": "IN_PROGRESS"},
2074        })))
2075    }
2076
2077    fn describe_image_scan_findings(
2078        &self,
2079        request: &AwsRequest,
2080    ) -> Result<AwsResponse, AwsServiceError> {
2081        let body = request.json_body();
2082        let name = req_str(&body, "repositoryName")?.to_string();
2083        let image_id = body
2084            .get("imageId")
2085            .cloned()
2086            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2087        let account = target_account_id(request, &body);
2088        let accounts = self.state.read();
2089        let state = accounts
2090            .get(&account)
2091            .ok_or_else(|| repository_not_found(&name))?;
2092        let repo = state
2093            .repositories
2094            .get(&name)
2095            .ok_or_else(|| repository_not_found(&name))?;
2096        let digest = resolve_image_digest(repo, &image_id)
2097            .ok_or_else(|| image_not_found(&name, &image_id))?;
2098        let findings = repo.scan_findings.get(&digest).cloned().unwrap_or_else(|| {
2099            crate::state::ImageScanFindings {
2100                image_digest: digest.clone(),
2101                scan_status: "COMPLETE".to_string(),
2102                scan_completed_at: Some(Utc::now()),
2103                vulnerability_source_updated_at: Some(Utc::now()),
2104                finding_severity_counts: BTreeMap::new(),
2105                findings: Vec::new(),
2106            }
2107        });
2108        Ok(AwsResponse::ok_json(json!({
2109            "registryId": repo.registry_id,
2110            "repositoryName": name,
2111            "imageId": image_id,
2112            "imageScanStatus": {"status": findings.scan_status},
2113            "imageScanFindings": {
2114                "imageScanCompletedAt": findings.scan_completed_at.map(|t| t.timestamp()),
2115                "vulnerabilitySourceUpdatedAt": findings.vulnerability_source_updated_at.map(|t| t.timestamp()),
2116                "findings": findings.findings,
2117                "findingSeverityCounts": findings.finding_severity_counts,
2118            },
2119        })))
2120    }
2121
2122    fn describe_registry(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2123        let body = request.json_body();
2124        let account = target_account_id(request, &body);
2125        let accounts = self.state.read();
2126        let state = accounts.get(&account);
2127        let registry_id = state
2128            .map(|s| s.account_id.clone())
2129            .unwrap_or_else(|| account.clone());
2130        let rules = state
2131            .and_then(|s| s.replication_configuration.as_ref())
2132            .map(|cfg| {
2133                cfg.rules
2134                    .iter()
2135                    .map(|r| {
2136                        json!({
2137                            "destinations": r.destinations.iter().map(|d| json!({
2138                                "region": d.region,
2139                                "registryId": d.registry_id,
2140                            })).collect::<Vec<_>>(),
2141                            "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2142                                "filter": f.filter,
2143                                "filterType": f.filter_type,
2144                            })).collect::<Vec<_>>(),
2145                        })
2146                    })
2147                    .collect::<Vec<_>>()
2148            })
2149            .unwrap_or_default();
2150        Ok(AwsResponse::ok_json(json!({
2151            "registryId": registry_id,
2152            "replicationConfiguration": {"rules": rules},
2153        })))
2154    }
2155
2156    fn get_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2157        let body = request.json_body();
2158        let account = target_account_id(request, &body);
2159        let accounts = self.state.read();
2160        let state = accounts
2161            .get(&account)
2162            .ok_or_else(registry_policy_not_found)?;
2163        let policy = state
2164            .registry_policy
2165            .clone()
2166            .ok_or_else(registry_policy_not_found)?;
2167        Ok(AwsResponse::ok_json(json!({
2168            "registryId": state.account_id,
2169            "policyText": policy,
2170        })))
2171    }
2172
2173    fn put_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2174        let body = request.json_body();
2175        let policy = req_str(&body, "policyText")?.to_string();
2176        if policy.len() > 10_240 {
2177            return Err(invalid_parameter(format!(
2178                "Value at 'policyText' failed to satisfy constraint: \
2179                 Member must have length less than or equal to 10240 (got {})",
2180                policy.len()
2181            )));
2182        }
2183        let account = target_account_id(request, &body);
2184        let mut accounts = self.state.write();
2185        let state = accounts.get_or_create(&account);
2186        state.registry_policy = Some(policy.clone());
2187        Ok(AwsResponse::ok_json(json!({
2188            "registryId": state.account_id,
2189            "policyText": policy,
2190        })))
2191    }
2192
2193    fn delete_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2194        let body = request.json_body();
2195        let account = target_account_id(request, &body);
2196        let mut accounts = self.state.write();
2197        let state = accounts
2198            .get_mut(&account)
2199            .ok_or_else(registry_policy_not_found)?;
2200        let policy = state
2201            .registry_policy
2202            .take()
2203            .ok_or_else(registry_policy_not_found)?;
2204        Ok(AwsResponse::ok_json(json!({
2205            "registryId": state.account_id,
2206            "policyText": policy,
2207        })))
2208    }
2209
2210    fn get_registry_scanning_configuration(
2211        &self,
2212        request: &AwsRequest,
2213    ) -> Result<AwsResponse, AwsServiceError> {
2214        let body = request.json_body();
2215        let account = target_account_id(request, &body);
2216        let accounts = self.state.read();
2217        let state = accounts.get(&account);
2218        let cfg = state
2219            .map(|s| s.registry_scanning_configuration.clone())
2220            .unwrap_or_default();
2221        let rules: Vec<Value> = cfg
2222            .rules
2223            .iter()
2224            .map(|r| {
2225                json!({
2226                    "scanFrequency": r.scan_frequency,
2227                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2228                        "filter": f.filter,
2229                        "filterType": f.filter_type,
2230                    })).collect::<Vec<_>>(),
2231                })
2232            })
2233            .collect();
2234        Ok(AwsResponse::ok_json(json!({
2235            "registryId": state.map(|s| s.account_id.clone()).unwrap_or(account),
2236            "scanningConfiguration": {
2237                "scanType": cfg.scan_type,
2238                "rules": rules,
2239            },
2240        })))
2241    }
2242
2243    fn put_registry_scanning_configuration(
2244        &self,
2245        request: &AwsRequest,
2246    ) -> Result<AwsResponse, AwsServiceError> {
2247        use crate::state::{RegistryScanningConfiguration, RegistryScanningRule, RepositoryFilter};
2248        let body = request.json_body();
2249        let scan_type = opt_str(&body, "scanType").unwrap_or("BASIC").to_string();
2250        if scan_type != "BASIC" && scan_type != "ENHANCED" {
2251            return Err(invalid_parameter(format!(
2252                "Invalid scanType '{scan_type}'. Must be BASIC or ENHANCED."
2253            )));
2254        }
2255        let rules = body
2256            .get("rules")
2257            .and_then(|v| v.as_array())
2258            .cloned()
2259            .unwrap_or_default();
2260        let parsed_rules: Vec<RegistryScanningRule> = rules
2261            .iter()
2262            .map(|r| RegistryScanningRule {
2263                scan_frequency: r
2264                    .get("scanFrequency")
2265                    .and_then(|v| v.as_str())
2266                    .unwrap_or("SCAN_ON_PUSH")
2267                    .to_string(),
2268                repository_filters: r
2269                    .get("repositoryFilters")
2270                    .and_then(|v| v.as_array())
2271                    .map(|arr| {
2272                        arr.iter()
2273                            .map(|f| RepositoryFilter {
2274                                filter: f
2275                                    .get("filter")
2276                                    .and_then(|v| v.as_str())
2277                                    .unwrap_or("")
2278                                    .to_string(),
2279                                filter_type: f
2280                                    .get("filterType")
2281                                    .and_then(|v| v.as_str())
2282                                    .unwrap_or("WILDCARD")
2283                                    .to_string(),
2284                            })
2285                            .collect()
2286                    })
2287                    .unwrap_or_default(),
2288            })
2289            .collect();
2290        let account = target_account_id(request, &body);
2291        let mut accounts = self.state.write();
2292        let state = accounts.get_or_create(&account);
2293        state.registry_scanning_configuration = RegistryScanningConfiguration {
2294            scan_type: scan_type.clone(),
2295            rules: parsed_rules,
2296        };
2297        let cfg = state.registry_scanning_configuration.clone();
2298        Ok(AwsResponse::ok_json(json!({
2299            "registryScanningConfiguration": {
2300                "scanType": cfg.scan_type,
2301                "rules": cfg.rules.iter().map(|r| json!({
2302                    "scanFrequency": r.scan_frequency,
2303                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2304                        "filter": f.filter,
2305                        "filterType": f.filter_type,
2306                    })).collect::<Vec<_>>(),
2307                })).collect::<Vec<_>>(),
2308            },
2309        })))
2310    }
2311
2312    fn batch_get_repository_scanning_configuration(
2313        &self,
2314        request: &AwsRequest,
2315    ) -> Result<AwsResponse, AwsServiceError> {
2316        let body = request.json_body();
2317        let names: Vec<String> = body
2318            .get("repositoryNames")
2319            .and_then(|v| v.as_array())
2320            .ok_or_else(|| invalid_parameter("Missing required field: repositoryNames"))?
2321            .iter()
2322            .filter_map(|v| v.as_str().map(|s| s.to_string()))
2323            .collect();
2324        let account = target_account_id(request, &body);
2325        let accounts = self.state.read();
2326        let state = accounts
2327            .get(&account)
2328            .ok_or_else(|| repository_not_found(&account))?;
2329        let mut scanning: Vec<Value> = Vec::new();
2330        let mut failures: Vec<Value> = Vec::new();
2331        for n in &names {
2332            match state.repositories.get(n) {
2333                Some(repo) => scanning.push(json!({
2334                    "repositoryArn": repo.repository_arn,
2335                    "repositoryName": n,
2336                    "scanOnPush": repo.image_scanning_configuration.scan_on_push,
2337                    "scanFrequency": "SCAN_ON_PUSH",
2338                    "appliedScanFilters": [],
2339                })),
2340                None => failures.push(json!({
2341                    "repositoryName": n,
2342                    "failureCode": "REPOSITORY_NOT_FOUND",
2343                    "failureReason": format!("Repository '{n}' not found"),
2344                })),
2345            }
2346        }
2347        Ok(AwsResponse::ok_json(json!({
2348            "scanningConfigurations": scanning,
2349            "failures": failures,
2350        })))
2351    }
2352
2353    fn put_replication_configuration(
2354        &self,
2355        request: &AwsRequest,
2356    ) -> Result<AwsResponse, AwsServiceError> {
2357        use crate::state::{
2358            ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
2359        };
2360        let body = request.json_body();
2361        let cfg_value = body
2362            .get("replicationConfiguration")
2363            .cloned()
2364            .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
2365        let rules_value = cfg_value
2366            .get("rules")
2367            .and_then(|v| v.as_array())
2368            .cloned()
2369            .unwrap_or_default();
2370        let rules: Vec<ReplicationRule> = rules_value
2371            .iter()
2372            .map(|r| ReplicationRule {
2373                destinations: r
2374                    .get("destinations")
2375                    .and_then(|v| v.as_array())
2376                    .map(|arr| {
2377                        arr.iter()
2378                            .map(|d| ReplicationDestination {
2379                                region: d
2380                                    .get("region")
2381                                    .and_then(|v| v.as_str())
2382                                    .unwrap_or("")
2383                                    .to_string(),
2384                                registry_id: d
2385                                    .get("registryId")
2386                                    .and_then(|v| v.as_str())
2387                                    .unwrap_or("")
2388                                    .to_string(),
2389                            })
2390                            .collect()
2391                    })
2392                    .unwrap_or_default(),
2393                repository_filters: r
2394                    .get("repositoryFilters")
2395                    .and_then(|v| v.as_array())
2396                    .map(|arr| {
2397                        arr.iter()
2398                            .map(|f| RepositoryFilter {
2399                                filter: f
2400                                    .get("filter")
2401                                    .and_then(|v| v.as_str())
2402                                    .unwrap_or("")
2403                                    .to_string(),
2404                                filter_type: f
2405                                    .get("filterType")
2406                                    .and_then(|v| v.as_str())
2407                                    .unwrap_or("PREFIX_MATCH")
2408                                    .to_string(),
2409                            })
2410                            .collect()
2411                    })
2412                    .unwrap_or_default(),
2413            })
2414            .collect();
2415        let account = target_account_id(request, &body);
2416        let mut accounts = self.state.write();
2417        let state = accounts.get_or_create(&account);
2418        state.replication_configuration = Some(ReplicationConfiguration { rules });
2419        let cfg = state.replication_configuration.clone().unwrap();
2420        Ok(AwsResponse::ok_json(json!({
2421            "replicationConfiguration": {
2422                "rules": cfg.rules.iter().map(|r| json!({
2423                    "destinations": r.destinations.iter().map(|d| json!({
2424                        "region": d.region,
2425                        "registryId": d.registry_id,
2426                    })).collect::<Vec<_>>(),
2427                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2428                        "filter": f.filter,
2429                        "filterType": f.filter_type,
2430                    })).collect::<Vec<_>>(),
2431                })).collect::<Vec<_>>(),
2432            },
2433        })))
2434    }
2435
2436    fn describe_image_replication_status(
2437        &self,
2438        request: &AwsRequest,
2439    ) -> Result<AwsResponse, AwsServiceError> {
2440        let body = request.json_body();
2441        let name = req_str(&body, "repositoryName")?.to_string();
2442        let image_id = body
2443            .get("imageId")
2444            .cloned()
2445            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2446        let account = target_account_id(request, &body);
2447        let accounts = self.state.read();
2448        let state = accounts
2449            .get(&account)
2450            .ok_or_else(|| repository_not_found(&name))?;
2451        let repo = state
2452            .repositories
2453            .get(&name)
2454            .ok_or_else(|| repository_not_found(&name))?;
2455        if resolve_image_digest(repo, &image_id).is_none() {
2456            return Err(image_not_found(&name, &image_id));
2457        }
2458        Ok(AwsResponse::ok_json(json!({
2459            "repositoryName": name,
2460            "imageId": image_id,
2461            "replicationStatuses": [],
2462        })))
2463    }
2464
2465    fn create_pull_through_cache_rule(
2466        &self,
2467        request: &AwsRequest,
2468    ) -> Result<AwsResponse, AwsServiceError> {
2469        use crate::state::PullThroughCacheRule;
2470        let body = request.json_body();
2471        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2472        validate_pullthrough_prefix(&prefix)?;
2473        let upstream_url = req_str(&body, "upstreamRegistryUrl")?.to_string();
2474        let account = target_account_id(request, &body);
2475        let mut accounts = self.state.write();
2476        let state = accounts.get_or_create(&account);
2477        if state.pull_through_cache_rules.contains_key(&prefix) {
2478            return Err(AwsServiceError::aws_error(
2479                StatusCode::BAD_REQUEST,
2480                "PullThroughCacheRuleAlreadyExistsException",
2481                format!("A pull through cache rule with the prefix '{prefix}' already exists."),
2482            ));
2483        }
2484        let now = Utc::now();
2485        let rule = PullThroughCacheRule {
2486            ecr_repository_prefix: prefix.clone(),
2487            upstream_registry_url: upstream_url.clone(),
2488            upstream_registry: opt_str(&body, "upstreamRegistry").map(|s| s.to_string()),
2489            credential_arn: opt_str(&body, "credentialArn").map(|s| s.to_string()),
2490            created_at: now,
2491            updated_at: now,
2492            custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2493        };
2494        state
2495            .pull_through_cache_rules
2496            .insert(prefix.clone(), rule.clone());
2497        Ok(AwsResponse::ok_json(pull_through_rule_json(
2498            state.account_id.as_str(),
2499            &rule,
2500        )))
2501    }
2502
2503    fn delete_pull_through_cache_rule(
2504        &self,
2505        request: &AwsRequest,
2506    ) -> Result<AwsResponse, AwsServiceError> {
2507        let body = request.json_body();
2508        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2509        validate_pullthrough_prefix(&prefix)?;
2510        let account = target_account_id(request, &body);
2511        let mut accounts = self.state.write();
2512        let state = accounts.get_or_create(&account);
2513        let removed = state
2514            .pull_through_cache_rules
2515            .remove(&prefix)
2516            .ok_or_else(|| {
2517                AwsServiceError::aws_error(
2518                    StatusCode::BAD_REQUEST,
2519                    "PullThroughCacheRuleNotFoundException",
2520                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2521                )
2522            })?;
2523        // DeletePullThroughCacheRuleResponse omits upstreamRegistry per
2524        // the Smithy model — it only appears on Create/Describe.
2525        let mut response = pull_through_rule_json(state.account_id.as_str(), &removed);
2526        if let Value::Object(ref mut map) = response {
2527            map.remove("upstreamRegistry");
2528        }
2529        Ok(AwsResponse::ok_json(response))
2530    }
2531
2532    fn describe_pull_through_cache_rules(
2533        &self,
2534        request: &AwsRequest,
2535    ) -> Result<AwsResponse, AwsServiceError> {
2536        let body = request.json_body();
2537        validate_max_results(&body)?;
2538        let prefixes: Vec<String> = body
2539            .get("ecrRepositoryPrefixes")
2540            .and_then(|v| v.as_array())
2541            .map(|arr| {
2542                arr.iter()
2543                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2544                    .collect()
2545            })
2546            .unwrap_or_default();
2547        let account = target_account_id(request, &body);
2548        let accounts = self.state.read();
2549        let state = accounts.get(&account);
2550        let rules: Vec<&crate::state::PullThroughCacheRule> = state
2551            .map(|s| s.pull_through_cache_rules.values().collect())
2552            .unwrap_or_default();
2553        let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2554        let filtered: Vec<Value> = rules
2555            .iter()
2556            .filter(|r| prefixes.is_empty() || prefixes.contains(&r.ecr_repository_prefix))
2557            .map(|r| pull_through_rule_json_with_updated(&registry_id, r))
2558            .collect();
2559        Ok(AwsResponse::ok_json(json!({
2560            "pullThroughCacheRules": filtered,
2561        })))
2562    }
2563
2564    fn update_pull_through_cache_rule(
2565        &self,
2566        request: &AwsRequest,
2567    ) -> Result<AwsResponse, AwsServiceError> {
2568        let body = request.json_body();
2569        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2570        let account = target_account_id(request, &body);
2571        let mut accounts = self.state.write();
2572        let state = accounts.get_or_create(&account);
2573        let rule = state
2574            .pull_through_cache_rules
2575            .get_mut(&prefix)
2576            .ok_or_else(|| {
2577                AwsServiceError::aws_error(
2578                    StatusCode::BAD_REQUEST,
2579                    "PullThroughCacheRuleNotFoundException",
2580                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2581                )
2582            })?;
2583        if let Some(cred) = opt_str(&body, "credentialArn") {
2584            rule.credential_arn = Some(cred.to_string());
2585        }
2586        if let Some(role) = opt_str(&body, "customRoleArn") {
2587            rule.custom_role_arn = Some(role.to_string());
2588        }
2589        rule.updated_at = Utc::now();
2590        let response = pull_through_rule_json_with_updated(state.account_id.as_str(), rule);
2591        Ok(AwsResponse::ok_json(response))
2592    }
2593
2594    fn validate_pull_through_cache_rule(
2595        &self,
2596        request: &AwsRequest,
2597    ) -> Result<AwsResponse, AwsServiceError> {
2598        let body = request.json_body();
2599        let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2600        let account = target_account_id(request, &body);
2601        let accounts = self.state.read();
2602        let state = accounts.get(&account);
2603        let rule = state
2604            .and_then(|s| s.pull_through_cache_rules.get(&prefix))
2605            .ok_or_else(|| {
2606                AwsServiceError::aws_error(
2607                    StatusCode::BAD_REQUEST,
2608                    "PullThroughCacheRuleNotFoundException",
2609                    format!("No pull through cache rule with prefix '{prefix}' exists."),
2610                )
2611            })?;
2612        let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2613        let mut base = pull_through_rule_json(&registry_id, rule);
2614        base["isValid"] = json!(true);
2615        Ok(AwsResponse::ok_json(base))
2616    }
2617
2618    fn get_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2619        let body = request.json_body();
2620        let name = req_str(&body, "name")?.to_string();
2621        validate_account_setting_name(&name)?;
2622        let account = target_account_id(request, &body);
2623        let accounts = self.state.read();
2624        let state = accounts.get(&account);
2625        let value = state
2626            .and_then(|s| s.account_settings.get(&name).cloned())
2627            .unwrap_or_else(|| "DISABLED".to_string());
2628        Ok(AwsResponse::ok_json(json!({
2629            "name": name,
2630            "value": value,
2631        })))
2632    }
2633
2634    fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2635        let body = request.json_body();
2636        let name = req_str(&body, "name")?.to_string();
2637        validate_account_setting_name(&name)?;
2638        let value = req_str(&body, "value")?.to_string();
2639        let account = target_account_id(request, &body);
2640        let mut accounts = self.state.write();
2641        let state = accounts.get_or_create(&account);
2642        state.account_settings.insert(name.clone(), value.clone());
2643        Ok(AwsResponse::ok_json(json!({
2644            "name": name,
2645            "value": value,
2646        })))
2647    }
2648
2649    fn create_repository_creation_template(
2650        &self,
2651        request: &AwsRequest,
2652    ) -> Result<AwsResponse, AwsServiceError> {
2653        use crate::state::{EncryptionConfiguration as Enc, RepositoryCreationTemplate};
2654        let body = request.json_body();
2655        let prefix = req_str(&body, "prefix")?.to_string();
2656        validate_template_prefix(&prefix)?;
2657        let applied_for: Vec<String> = body
2658            .get("appliedFor")
2659            .and_then(|v| v.as_array())
2660            .map(|arr| {
2661                arr.iter()
2662                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2663                    .collect()
2664            })
2665            .unwrap_or_default();
2666        let image_tag_mutability = opt_str(&body, "imageTagMutability")
2667            .unwrap_or("MUTABLE")
2668            .to_string();
2669        let resource_tags = body
2670            .get("resourceTags")
2671            .and_then(|v| v.as_array())
2672            .cloned()
2673            .unwrap_or_default();
2674        let encryption = body.get("encryptionConfiguration").map(|v| Enc {
2675            encryption_type: v
2676                .get("encryptionType")
2677                .and_then(|x| x.as_str())
2678                .unwrap_or("AES256")
2679                .to_string(),
2680            kms_key: v
2681                .get("kmsKey")
2682                .and_then(|x| x.as_str())
2683                .map(|s| s.to_string()),
2684        });
2685        let account = target_account_id(request, &body);
2686        let mut accounts = self.state.write();
2687        let state = accounts.get_or_create(&account);
2688        if state.repository_creation_templates.contains_key(&prefix) {
2689            return Err(AwsServiceError::aws_error(
2690                StatusCode::BAD_REQUEST,
2691                "TemplateAlreadyExistsException",
2692                format!(
2693                    "A repository creation template with the prefix '{prefix}' already exists."
2694                ),
2695            ));
2696        }
2697        let now = Utc::now();
2698        let tpl = RepositoryCreationTemplate {
2699            prefix: prefix.clone(),
2700            description: opt_str(&body, "description").map(|s| s.to_string()),
2701            image_tag_mutability,
2702            applied_for,
2703            resource_tags,
2704            created_at: now,
2705            updated_at: now,
2706            custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2707            repository_policy: opt_str(&body, "repositoryPolicy").map(|s| s.to_string()),
2708            lifecycle_policy: opt_str(&body, "lifecyclePolicy").map(|s| s.to_string()),
2709            encryption_configuration: encryption,
2710        };
2711        state
2712            .repository_creation_templates
2713            .insert(prefix, tpl.clone());
2714        Ok(AwsResponse::ok_json(json!({
2715            "registryId": state.account_id,
2716            "repositoryCreationTemplate": template_to_json(&tpl),
2717        })))
2718    }
2719
2720    fn delete_repository_creation_template(
2721        &self,
2722        request: &AwsRequest,
2723    ) -> Result<AwsResponse, AwsServiceError> {
2724        let body = request.json_body();
2725        let prefix = req_str(&body, "prefix")?.to_string();
2726        validate_template_prefix(&prefix)?;
2727        let account = target_account_id(request, &body);
2728        let mut accounts = self.state.write();
2729        let state = accounts.get_or_create(&account);
2730        let removed = state
2731            .repository_creation_templates
2732            .remove(&prefix)
2733            .ok_or_else(|| {
2734                AwsServiceError::aws_error(
2735                    StatusCode::BAD_REQUEST,
2736                    "TemplateNotFoundException",
2737                    format!("No repository creation template with prefix '{prefix}' exists."),
2738                )
2739            })?;
2740        Ok(AwsResponse::ok_json(json!({
2741            "registryId": state.account_id,
2742            "repositoryCreationTemplate": template_to_json(&removed),
2743        })))
2744    }
2745
2746    fn describe_repository_creation_templates(
2747        &self,
2748        request: &AwsRequest,
2749    ) -> Result<AwsResponse, AwsServiceError> {
2750        let body = request.json_body();
2751        validate_max_results(&body)?;
2752        let prefixes: Vec<String> = body
2753            .get("prefixes")
2754            .and_then(|v| v.as_array())
2755            .map(|arr| {
2756                arr.iter()
2757                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
2758                    .collect()
2759            })
2760            .unwrap_or_default();
2761        let account = target_account_id(request, &body);
2762        let accounts = self.state.read();
2763        let state = accounts.get(&account);
2764        let tpls: Vec<Value> = state
2765            .map(|s| {
2766                s.repository_creation_templates
2767                    .values()
2768                    .filter(|t| prefixes.is_empty() || prefixes.contains(&t.prefix))
2769                    .map(template_to_json)
2770                    .collect()
2771            })
2772            .unwrap_or_default();
2773        Ok(AwsResponse::ok_json(json!({
2774            "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2775            "repositoryCreationTemplates": tpls,
2776        })))
2777    }
2778
2779    fn update_repository_creation_template(
2780        &self,
2781        request: &AwsRequest,
2782    ) -> Result<AwsResponse, AwsServiceError> {
2783        let body = request.json_body();
2784        let prefix = req_str(&body, "prefix")?.to_string();
2785        validate_template_prefix(&prefix)?;
2786        let account = target_account_id(request, &body);
2787        let mut accounts = self.state.write();
2788        let state = accounts.get_or_create(&account);
2789        let tpl = state
2790            .repository_creation_templates
2791            .get_mut(&prefix)
2792            .ok_or_else(|| {
2793                AwsServiceError::aws_error(
2794                    StatusCode::BAD_REQUEST,
2795                    "TemplateNotFoundException",
2796                    format!("No repository creation template with prefix '{prefix}' exists."),
2797                )
2798            })?;
2799        if let Some(desc) = opt_str(&body, "description") {
2800            tpl.description = Some(desc.to_string());
2801        }
2802        if let Some(mutability) = opt_str(&body, "imageTagMutability") {
2803            tpl.image_tag_mutability = mutability.to_string();
2804        }
2805        if let Some(arr) = body.get("appliedFor").and_then(|v| v.as_array()) {
2806            tpl.applied_for = arr
2807                .iter()
2808                .filter_map(|v| v.as_str().map(|s| s.to_string()))
2809                .collect();
2810        }
2811        if let Some(arr) = body.get("resourceTags").and_then(|v| v.as_array()) {
2812            tpl.resource_tags = arr.clone();
2813        }
2814        tpl.updated_at = Utc::now();
2815        Ok(AwsResponse::ok_json(json!({
2816            "registryId": state.account_id,
2817            "repositoryCreationTemplate": template_to_json(tpl),
2818        })))
2819    }
2820
2821    fn get_signing_configuration(
2822        &self,
2823        request: &AwsRequest,
2824    ) -> Result<AwsResponse, AwsServiceError> {
2825        let body = request.json_body();
2826        let account = target_account_id(request, &body);
2827        let accounts = self.state.read();
2828        let state = accounts.get(&account);
2829        let rules: Vec<Value> = state
2830            .and_then(|s| s.signing_configuration.as_ref())
2831            .map(|c| c.rules.clone())
2832            .unwrap_or_default();
2833        Ok(AwsResponse::ok_json(json!({
2834            "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2835            "signingConfiguration": {"rules": rules},
2836        })))
2837    }
2838
2839    fn put_signing_configuration(
2840        &self,
2841        request: &AwsRequest,
2842    ) -> Result<AwsResponse, AwsServiceError> {
2843        use crate::signing::TrustedKey;
2844        use crate::state::SigningConfiguration;
2845        let body = request.json_body();
2846        let cfg = body
2847            .get("signingConfiguration")
2848            .ok_or_else(|| invalid_parameter("Missing required field: signingConfiguration"))?;
2849        let rules: Vec<Value> = cfg
2850            .get("rules")
2851            .and_then(|v| v.as_array())
2852            .cloned()
2853            .unwrap_or_default();
2854
2855        // Extract trusted keys from the rules. AWS's real signing-config
2856        // schema is nested; we accept the minimal fakecloud-friendly
2857        // shape `{trustedKeys: [{keyId, pem, algorithm}]}` per rule.
2858        // Rules that don't carry recognised keys are still
2859        // round-trippable via the raw `rules` passthrough.
2860        let mut trusted_keys: Vec<TrustedKey> = Vec::new();
2861        for rule in &rules {
2862            let keys = match rule.get("trustedKeys").and_then(|v| v.as_array()) {
2863                Some(k) => k,
2864                None => continue,
2865            };
2866            for k in keys {
2867                let key_id = k
2868                    .get("keyId")
2869                    .and_then(|v| v.as_str())
2870                    .unwrap_or_default()
2871                    .to_string();
2872                let pem = match k.get("pem").and_then(|v| v.as_str()) {
2873                    Some(p) => p.to_string(),
2874                    None => continue,
2875                };
2876                let algorithm = k
2877                    .get("algorithm")
2878                    .and_then(|v| v.as_str())
2879                    .unwrap_or("ECDSA-P256")
2880                    .to_string();
2881                // Validate the PEM up front so bad rules fail
2882                // PutSigningConfiguration rather than silently skipping
2883                // verification at describe time.
2884                if <p256::ecdsa::VerifyingKey as p256::pkcs8::DecodePublicKey>::from_public_key_pem(
2885                    &pem,
2886                )
2887                .is_err()
2888                {
2889                    return Err(invalid_parameter(format!(
2890                        "trusted key {key_id} is not a valid ECDSA-P256 PEM-encoded public key"
2891                    )));
2892                }
2893                trusted_keys.push(TrustedKey {
2894                    key_id,
2895                    pem,
2896                    algorithm,
2897                });
2898            }
2899        }
2900
2901        let account = target_account_id(request, &body);
2902        let mut accounts = self.state.write();
2903        let state = accounts.get_or_create(&account);
2904        state.signing_configuration = Some(SigningConfiguration {
2905            rules: rules.clone(),
2906            trusted_keys,
2907        });
2908        Ok(AwsResponse::ok_json(json!({
2909            "signingConfiguration": {"rules": rules},
2910        })))
2911    }
2912
2913    fn delete_signing_configuration(
2914        &self,
2915        request: &AwsRequest,
2916    ) -> Result<AwsResponse, AwsServiceError> {
2917        let body = request.json_body();
2918        let account = target_account_id(request, &body);
2919        let mut accounts = self.state.write();
2920        let state = accounts.get_or_create(&account);
2921        state.signing_configuration = None;
2922        Ok(AwsResponse::ok_json(json!({})))
2923    }
2924
2925    fn describe_image_signing_status(
2926        &self,
2927        request: &AwsRequest,
2928    ) -> Result<AwsResponse, AwsServiceError> {
2929        let body = request.json_body();
2930        let name = req_str(&body, "repositoryName")?.to_string();
2931        let image_id = body
2932            .get("imageId")
2933            .cloned()
2934            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2935        let account = target_account_id(request, &body);
2936        let accounts = self.state.read();
2937        let state = accounts
2938            .get(&account)
2939            .ok_or_else(|| repository_not_found(&name))?;
2940        let repo = state
2941            .repositories
2942            .get(&name)
2943            .ok_or_else(|| repository_not_found(&name))?;
2944        let image_digest = resolve_image_digest(repo, &image_id)
2945            .ok_or_else(|| image_not_found(&name, &image_id))?;
2946
2947        let trusted_keys: &[crate::signing::TrustedKey] = state
2948            .signing_configuration
2949            .as_ref()
2950            .map(|c| c.trusted_keys.as_slice())
2951            .unwrap_or(&[]);
2952
2953        // Locate the cosign companion signature tag
2954        // (`sha256-<hex>.sig`) in the same repo. Absent -> UNSIGNED.
2955        let sig_tag = match crate::signing::companion_sig_tag(&image_digest) {
2956            Some(t) => t,
2957            None => {
2958                return Ok(AwsResponse::ok_json(json!({
2959                    "registryId": repo.registry_id,
2960                    "repositoryName": name,
2961                    "imageId": image_id,
2962                    "imageSignatures": [],
2963                    "signingStatus": "UNSIGNED",
2964                })));
2965            }
2966        };
2967        let sig_manifest_digest = match repo.image_tags.get(&sig_tag) {
2968            Some(d) => d,
2969            None => {
2970                return Ok(AwsResponse::ok_json(json!({
2971                    "registryId": repo.registry_id,
2972                    "repositoryName": name,
2973                    "imageId": image_id,
2974                    "imageSignatures": [],
2975                    "signingStatus": "UNSIGNED",
2976                })));
2977            }
2978        };
2979        let sig_image = match repo.images.get(sig_manifest_digest) {
2980            Some(i) => i,
2981            None => {
2982                return Ok(AwsResponse::ok_json(json!({
2983                    "registryId": repo.registry_id,
2984                    "repositoryName": name,
2985                    "imageId": image_id,
2986                    "imageSignatures": [],
2987                    "signingStatus": "UNSIGNED",
2988                })));
2989            }
2990        };
2991
2992        let manifest_json: Value = match serde_json::from_str(&sig_image.image_manifest) {
2993            Ok(v) => v,
2994            Err(_) => {
2995                return Ok(AwsResponse::ok_json(json!({
2996                    "registryId": repo.registry_id,
2997                    "repositoryName": name,
2998                    "imageId": image_id,
2999                    "imageSignatures": [],
3000                    "signingStatus": "INVALID_SIGNATURE",
3001                })));
3002            }
3003        };
3004        let (layer_digest, signature_b64) =
3005            match crate::signing::extract_signature_annotation(&manifest_json) {
3006                Some(x) => x,
3007                None => {
3008                    return Ok(AwsResponse::ok_json(json!({
3009                        "registryId": repo.registry_id,
3010                        "repositoryName": name,
3011                        "imageId": image_id,
3012                        "imageSignatures": [],
3013                        "signingStatus": "UNSIGNED",
3014                    })));
3015                }
3016            };
3017
3018        // Fetch the payload (signed bytes = simple-signing JSON blob).
3019        let payload_bytes: Vec<u8> = match repo.layers.get(&layer_digest) {
3020            Some(layer) => base64::Engine::decode(
3021                &base64::engine::general_purpose::STANDARD,
3022                layer.blob_b64.as_bytes(),
3023            )
3024            .unwrap_or_default(),
3025            None => {
3026                return Ok(AwsResponse::ok_json(json!({
3027                    "registryId": repo.registry_id,
3028                    "repositoryName": name,
3029                    "imageId": image_id,
3030                    "imageSignatures": [],
3031                    "signingStatus": "UNSIGNED",
3032                })));
3033            }
3034        };
3035
3036        // The payload must name the signed image — catches copy-paste
3037        // of one image's signature onto another.
3038        if let Some(named) = crate::signing::referenced_image_digest(&payload_bytes) {
3039            if named != image_digest {
3040                return Ok(AwsResponse::ok_json(json!({
3041                    "registryId": repo.registry_id,
3042                    "repositoryName": name,
3043                    "imageId": image_id,
3044                    "imageSignatures": [],
3045                    "signingStatus": "INVALID_SIGNATURE",
3046                    "statusReason": "signature payload references a different image digest",
3047                })));
3048            }
3049        }
3050
3051        // Verify against each trusted key until one matches.
3052        let mut matched: Option<&crate::signing::TrustedKey> = None;
3053        for key in trusted_keys {
3054            if crate::signing::verify_cosign_signature(&key.pem, &payload_bytes, &signature_b64)
3055                .is_ok()
3056            {
3057                matched = Some(key);
3058                break;
3059            }
3060        }
3061
3062        let mut response = json!({
3063            "registryId": repo.registry_id,
3064            "repositoryName": name,
3065            "imageId": image_id,
3066        });
3067        if let Some(key) = matched {
3068            response["imageSignatures"] = json!([{
3069                "signatureFormat": "COSIGN",
3070                "keyId": key.key_id,
3071                "algorithm": key.algorithm,
3072                "valid": true,
3073            }]);
3074            response["signingStatus"] = json!("SIGNED");
3075        } else if trusted_keys.is_empty() {
3076            // A valid-looking signature exists but no trusted keys are
3077            // configured to verify against — surface the signature but
3078            // mark it UNVERIFIED so the caller knows to configure keys.
3079            response["imageSignatures"] = json!([{
3080                "signatureFormat": "COSIGN",
3081                "valid": false,
3082                "statusReason": "no trusted keys configured"
3083            }]);
3084            response["signingStatus"] = json!("UNVERIFIED");
3085        } else {
3086            response["imageSignatures"] = json!([{
3087                "signatureFormat": "COSIGN",
3088                "valid": false,
3089                "statusReason": "signature did not match any trusted key"
3090            }]);
3091            response["signingStatus"] = json!("INVALID_SIGNATURE");
3092        }
3093        Ok(AwsResponse::ok_json(response))
3094    }
3095
3096    fn register_pull_time_update_exclusion(
3097        &self,
3098        request: &AwsRequest,
3099    ) -> Result<AwsResponse, AwsServiceError> {
3100        use crate::state::PullTimeExclusion;
3101        let body = request.json_body();
3102        let principal_arn = req_str(&body, "principalArn")?.to_string();
3103        // Smithy `com.amazonaws.ecr#PrincipalArn` length 0..=200.
3104        validate_string_length("principalArn", &principal_arn, 0, 200)?;
3105        let account = target_account_id(request, &body);
3106        let mut accounts = self.state.write();
3107        let state = accounts.get_or_create(&account);
3108        state
3109            .pull_time_exclusions
3110            .entry(principal_arn.clone())
3111            .or_insert_with(|| PullTimeExclusion {
3112                principal_arn: principal_arn.clone(),
3113                registered_at: Utc::now(),
3114            });
3115        Ok(AwsResponse::ok_json(json!({
3116            "principalArn": principal_arn,
3117        })))
3118    }
3119
3120    fn deregister_pull_time_update_exclusion(
3121        &self,
3122        request: &AwsRequest,
3123    ) -> Result<AwsResponse, AwsServiceError> {
3124        let body = request.json_body();
3125        let principal_arn = req_str(&body, "principalArn")?.to_string();
3126        validate_string_length("principalArn", &principal_arn, 0, 200)?;
3127        let account = target_account_id(request, &body);
3128        let mut accounts = self.state.write();
3129        let state = accounts.get_or_create(&account);
3130        state.pull_time_exclusions.remove(&principal_arn);
3131        Ok(AwsResponse::ok_json(json!({
3132            "principalArn": principal_arn,
3133        })))
3134    }
3135
3136    fn list_pull_time_update_exclusions(
3137        &self,
3138        request: &AwsRequest,
3139    ) -> Result<AwsResponse, AwsServiceError> {
3140        let body = request.json_body();
3141        validate_max_results(&body)?;
3142        let account = target_account_id(request, &body);
3143        let accounts = self.state.read();
3144        let state = accounts.get(&account);
3145        let exclusions: Vec<Value> = state
3146            .map(|s| {
3147                s.pull_time_exclusions
3148                    .values()
3149                    .map(|e| {
3150                        json!({
3151                            "principalArn": e.principal_arn,
3152                            "registeredAt": e.registered_at.timestamp(),
3153                        })
3154                    })
3155                    .collect()
3156            })
3157            .unwrap_or_default();
3158        Ok(AwsResponse::ok_json(json!({
3159            "pullTimeUpdateExclusions": exclusions,
3160        })))
3161    }
3162
3163    fn list_image_referrers(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3164        let body = request.json_body();
3165        let name = req_str(&body, "repositoryName")?.to_string();
3166        let subject = body
3167            .get("subjectId")
3168            .cloned()
3169            .ok_or_else(|| invalid_parameter("Missing subjectId"))?;
3170        let digest = subject
3171            .get("imageDigest")
3172            .and_then(|v| v.as_str())
3173            .ok_or_else(|| invalid_parameter("subjectId.imageDigest is required"))?
3174            .to_string();
3175        let account = target_account_id(request, &body);
3176        let accounts = self.state.read();
3177        let state = accounts
3178            .get(&account)
3179            .ok_or_else(|| repository_not_found(&name))?;
3180        let repo = state
3181            .repositories
3182            .get(&name)
3183            .ok_or_else(|| repository_not_found(&name))?;
3184        if !repo.images.contains_key(&digest) {
3185            return Err(AwsServiceError::aws_error(
3186                StatusCode::BAD_REQUEST,
3187                "ImageNotFoundException",
3188                format!("Subject image {digest} not found in repository '{name}'"),
3189            ));
3190        }
3191        Ok(AwsResponse::ok_json(json!({
3192            "imageReferrers": [],
3193        })))
3194    }
3195
3196    fn update_image_storage_class(
3197        &self,
3198        request: &AwsRequest,
3199    ) -> Result<AwsResponse, AwsServiceError> {
3200        let body = request.json_body();
3201        let name = req_str(&body, "repositoryName")?.to_string();
3202        let image_id = body
3203            .get("imageId")
3204            .cloned()
3205            .ok_or_else(|| invalid_parameter("Missing imageId"))?;
3206        let target_class = req_str(&body, "targetStorageClass")?.to_string();
3207        if target_class != "STANDARD" && target_class != "ARCHIVE" {
3208            return Err(invalid_parameter(format!(
3209                "Invalid targetStorageClass '{target_class}'. Must be STANDARD or ARCHIVE."
3210            )));
3211        }
3212        let account = target_account_id(request, &body);
3213        let accounts = self.state.read();
3214        let state = accounts
3215            .get(&account)
3216            .ok_or_else(|| repository_not_found(&name))?;
3217        let repo = state
3218            .repositories
3219            .get(&name)
3220            .ok_or_else(|| repository_not_found(&name))?;
3221        if resolve_image_digest(repo, &image_id).is_none() {
3222            return Err(image_not_found(&name, &image_id));
3223        }
3224        Ok(AwsResponse::ok_json(json!({
3225            "registryId": repo.registry_id,
3226            "repositoryName": name,
3227            "imageId": image_id,
3228            "targetStorageClass": target_class,
3229        })))
3230    }
3231}
3232
3233fn validate_account_setting_name(name: &str) -> Result<(), AwsServiceError> {
3234    // Smithy `@length(1, 64)` on AccountSettingName.
3235    if name.is_empty() || name.len() > 64 {
3236        return Err(invalid_parameter(format!(
3237            "Invalid parameter at 'name': '{name}' failed to satisfy constraint: \
3238             Member must have length between 1 and 64"
3239        )));
3240    }
3241    Ok(())
3242}
3243
3244fn validate_pullthrough_prefix(prefix: &str) -> Result<(), AwsServiceError> {
3245    // Smithy @length(2, 30) on PullThroughCacheRuleRepositoryPrefix.
3246    if prefix.len() < 2 || prefix.len() > 30 {
3247        return Err(invalid_parameter(format!(
3248            "Invalid parameter at 'ecrRepositoryPrefix': '{prefix}' failed to satisfy constraint: \
3249             Member must have length between 2 and 30"
3250        )));
3251    }
3252    Ok(())
3253}
3254
3255fn validate_template_prefix(prefix: &str) -> Result<(), AwsServiceError> {
3256    // Smithy `@length(2, 256)` on CreationTemplatePrefixString, plus
3257    // AWS's `ROOT` sentinel that's allowed on any-prefix templates.
3258    if prefix == "ROOT" {
3259        return Ok(());
3260    }
3261    if prefix.len() < 2 || prefix.len() > 256 {
3262        return Err(invalid_parameter(format!(
3263            "Invalid parameter at 'prefix': '{prefix}' failed to satisfy constraint: \
3264             Member must have length between 2 and 256"
3265        )));
3266    }
3267    Ok(())
3268}
3269
3270fn validate_max_results(body: &Value) -> Result<(), AwsServiceError> {
3271    if let Some(n) = body.get("maxResults").and_then(|v| v.as_i64()) {
3272        if !(1..=1000).contains(&n) {
3273            return Err(invalid_parameter(format!(
3274                "Value '{n}' at 'maxResults' failed to satisfy constraint: \
3275                 Member must have value between 1 and 1000"
3276            )));
3277        }
3278    }
3279    Ok(())
3280}
3281
3282fn pull_through_rule_json(registry_id: &str, r: &crate::state::PullThroughCacheRule) -> Value {
3283    pull_through_rule_json_with(registry_id, r, false)
3284}
3285
3286fn pull_through_rule_json_with_updated(
3287    registry_id: &str,
3288    r: &crate::state::PullThroughCacheRule,
3289) -> Value {
3290    pull_through_rule_json_with(registry_id, r, true)
3291}
3292
3293fn pull_through_rule_json_with(
3294    registry_id: &str,
3295    r: &crate::state::PullThroughCacheRule,
3296    include_updated: bool,
3297) -> Value {
3298    let mut out = json!({
3299        "ecrRepositoryPrefix": r.ecr_repository_prefix,
3300        "upstreamRegistryUrl": r.upstream_registry_url,
3301        "createdAt": r.created_at.timestamp(),
3302        "registryId": registry_id,
3303    });
3304    if include_updated {
3305        out["updatedAt"] = json!(r.updated_at.timestamp());
3306    }
3307    if let Some(v) = &r.credential_arn {
3308        out["credentialArn"] = json!(v);
3309    }
3310    if let Some(v) = &r.upstream_registry {
3311        out["upstreamRegistry"] = json!(v);
3312    }
3313    if let Some(v) = &r.custom_role_arn {
3314        out["customRoleArn"] = json!(v);
3315    }
3316    out
3317}
3318
3319fn template_to_json(tpl: &crate::state::RepositoryCreationTemplate) -> Value {
3320    let mut out = json!({
3321        "prefix": tpl.prefix,
3322        "imageTagMutability": tpl.image_tag_mutability,
3323        "appliedFor": tpl.applied_for,
3324        "resourceTags": tpl.resource_tags,
3325        "createdAt": tpl.created_at.timestamp(),
3326        "updatedAt": tpl.updated_at.timestamp(),
3327    });
3328    if let Some(desc) = &tpl.description {
3329        out["description"] = json!(desc);
3330    }
3331    if let Some(arn) = &tpl.custom_role_arn {
3332        out["customRoleArn"] = json!(arn);
3333    }
3334    if let Some(p) = &tpl.repository_policy {
3335        out["repositoryPolicy"] = json!(p);
3336    }
3337    if let Some(p) = &tpl.lifecycle_policy {
3338        out["lifecyclePolicy"] = json!(p);
3339    }
3340    if let Some(enc) = &tpl.encryption_configuration {
3341        let mut e = Map::new();
3342        e.insert("encryptionType".to_string(), json!(enc.encryption_type));
3343        if let Some(k) = &enc.kms_key {
3344            e.insert("kmsKey".to_string(), json!(k));
3345        }
3346        out["encryptionConfiguration"] = Value::Object(e);
3347    }
3348    out
3349}
3350
3351#[cfg(test)]
3352mod tests {
3353    use super::validate_repository_name;
3354
3355    #[track_caller]
3356    fn ok(n: &str) {
3357        validate_repository_name(n).unwrap_or_else(|_| panic!("expected '{n}' to validate"));
3358    }
3359    #[track_caller]
3360    fn bad(n: &str) {
3361        assert!(
3362            validate_repository_name(n).is_err(),
3363            "expected '{n}' to be rejected",
3364        );
3365    }
3366
3367    #[test]
3368    fn accepts_valid_names() {
3369        ok("foo");
3370        ok("foo-bar");
3371        ok("foo.bar");
3372        ok("foo_bar");
3373        ok("foo/bar");
3374        ok("team/svc");
3375        ok("a/b/c");
3376        ok("foo123/bar-baz.qux_q");
3377    }
3378
3379    #[test]
3380    fn rejects_invalid_names() {
3381        bad("");
3382        bad("a");
3383        bad("/foo");
3384        bad("foo/");
3385        bad("foo//bar");
3386        bad("-foo");
3387        bad("foo-");
3388        bad("foo--bar");
3389        bad("foo..bar");
3390        bad("foo__bar");
3391        bad("Foo");
3392        bad("foo bar");
3393        bad("foo!");
3394    }
3395
3396    // ── Lifecycle policy evaluator ─────────────────────────────────
3397    use super::{evaluate_lifecycle_policy, wildcard_match};
3398    use crate::state::{Image, Repository};
3399    use chrono::Utc;
3400    use std::collections::BTreeMap;
3401
3402    fn repo_with_images(entries: &[(&str, &[&str], i64)]) -> Repository {
3403        // entries: (digest, tags, minutes_ago_pushed)
3404        let mut r = Repository::new("test-repo", "arn".into(), "123", "http://localhost");
3405        for (digest, tags, minutes_ago) in entries {
3406            let pushed = Utc::now() - chrono::Duration::minutes(*minutes_ago);
3407            r.images.insert(
3408                (*digest).to_string(),
3409                Image {
3410                    image_digest: (*digest).to_string(),
3411                    image_manifest: String::new(),
3412                    image_manifest_media_type: String::new(),
3413                    artifact_media_type: None,
3414                    image_size_in_bytes: 0,
3415                    image_pushed_at: pushed,
3416                    last_recorded_pull_time: None,
3417                },
3418            );
3419            for t in *tags {
3420                r.image_tags.insert((*t).to_string(), (*digest).to_string());
3421            }
3422        }
3423        r
3424    }
3425
3426    #[test]
3427    fn lifecycle_count_more_than_tagged() {
3428        // Five tagged images; rule says keep newest 2, prune 3.
3429        let r = repo_with_images(&[
3430            ("sha256:a", &["v1"], 50),
3431            ("sha256:b", &["v2"], 40),
3432            ("sha256:c", &["v3"], 30),
3433            ("sha256:d", &["v4"], 20),
3434            ("sha256:e", &["v5"], 10),
3435        ]);
3436        let policy = r#"{"rules":[{
3437            "rulePriority": 1,
3438            "selection": {"tagStatus":"tagged","countType":"imageCountMoreThan","countNumber":2}
3439        }]}"#;
3440        let prune = evaluate_lifecycle_policy(&r, policy);
3441        assert_eq!(prune.len(), 3);
3442        assert!(prune.contains(&"sha256:a".to_string()));
3443        assert!(prune.contains(&"sha256:b".to_string()));
3444        assert!(prune.contains(&"sha256:c".to_string()));
3445    }
3446
3447    #[test]
3448    fn lifecycle_untagged_only() {
3449        let r = repo_with_images(&[("sha256:tagged", &["v1"], 60), ("sha256:untag", &[], 30)]);
3450        let policy = r#"{"rules":[{
3451            "rulePriority": 1,
3452            "selection": {"tagStatus":"untagged","countType":"imageCountMoreThan","countNumber":0}
3453        }]}"#;
3454        let prune = evaluate_lifecycle_policy(&r, policy);
3455        assert_eq!(prune, vec!["sha256:untag".to_string()]);
3456    }
3457
3458    #[test]
3459    fn lifecycle_tag_prefix_list() {
3460        let r = repo_with_images(&[
3461            ("sha256:a", &["dev-1"], 60),
3462            ("sha256:b", &["dev-2"], 50),
3463            ("sha256:c", &["prod-1"], 40),
3464            ("sha256:d", &["prod-2"], 30),
3465        ]);
3466        // Keep newest 1 among dev-*, prune the rest; leave prod-* alone.
3467        let policy = r#"{"rules":[{
3468            "rulePriority": 1,
3469            "selection": {
3470                "tagStatus":"tagged",
3471                "tagPrefixList":["dev-"],
3472                "countType":"imageCountMoreThan",
3473                "countNumber":1
3474            }
3475        }]}"#;
3476        let prune = evaluate_lifecycle_policy(&r, policy);
3477        assert_eq!(prune, vec!["sha256:a".to_string()]);
3478    }
3479
3480    #[test]
3481    fn lifecycle_tag_pattern_list_wildcards() {
3482        let r = repo_with_images(&[
3483            ("sha256:a", &["release-2024-01"], 60),
3484            ("sha256:b", &["release-2024-02"], 50),
3485            ("sha256:c", &["hotfix-2024-02"], 40),
3486        ]);
3487        // Match only `release-*`; prune all of them (countNumber=0).
3488        let policy = r#"{"rules":[{
3489            "rulePriority": 1,
3490            "selection": {
3491                "tagStatus":"tagged",
3492                "tagPatternList":["release-*"],
3493                "countType":"imageCountMoreThan",
3494                "countNumber":0
3495            }
3496        }]}"#;
3497        let prune = evaluate_lifecycle_policy(&r, policy);
3498        assert_eq!(prune.len(), 2);
3499        assert!(prune.contains(&"sha256:a".to_string()));
3500        assert!(prune.contains(&"sha256:b".to_string()));
3501        assert!(!prune.contains(&"sha256:c".to_string()));
3502    }
3503
3504    #[test]
3505    fn lifecycle_since_image_pushed_days() {
3506        let r = repo_with_images(&[
3507            ("sha256:old", &["v1"], 60 * 24 * 10), // 10 days ago
3508            ("sha256:new", &["v2"], 60 * 24),      // 1 day ago
3509        ]);
3510        let policy = r#"{"rules":[{
3511            "rulePriority": 1,
3512            "selection": {
3513                "tagStatus":"any",
3514                "countType":"sinceImagePushed",
3515                "countUnit":"days",
3516                "countNumber":5
3517            }
3518        }]}"#;
3519        let prune = evaluate_lifecycle_policy(&r, policy);
3520        assert_eq!(prune, vec!["sha256:old".to_string()]);
3521    }
3522
3523    #[test]
3524    fn lifecycle_rule_priority_order() {
3525        // Priority 1 keeps newest 2 tagged; priority 2 then prunes all
3526        // remaining tagged > 1 day old. Priority 1 runs first, then 2
3527        // sees fewer candidates.
3528        let r = repo_with_images(&[
3529            ("sha256:a", &["v1"], 60 * 24 * 10),
3530            ("sha256:b", &["v2"], 60 * 24 * 5),
3531            ("sha256:c", &["v3"], 60 * 24 * 2),
3532            ("sha256:d", &["v4"], 60 * 24),
3533        ]);
3534        let policy = r#"{"rules":[
3535            {"rulePriority": 2,
3536             "selection": {"tagStatus":"any","countType":"sinceImagePushed","countUnit":"days","countNumber":3}},
3537            {"rulePriority": 1,
3538             "selection": {"tagStatus":"tagged","countType":"imageCountMoreThan","countNumber":2}}
3539        ]}"#;
3540        let prune: std::collections::BTreeSet<String> =
3541            evaluate_lifecycle_policy(&r, policy).into_iter().collect();
3542        // Priority 1 (runs first): prunes a + b (keeping newest 2 = c, d).
3543        // Priority 2: c and d are both < 3 days -> survives.
3544        assert!(prune.contains("sha256:a"));
3545        assert!(prune.contains("sha256:b"));
3546    }
3547
3548    #[test]
3549    fn wildcard_match_basics() {
3550        assert!(wildcard_match("release-*", "release-2024"));
3551        assert!(wildcard_match("*-stable", "v1-stable"));
3552        assert!(wildcard_match("a*b*c", "a-something-b-more-c"));
3553        assert!(wildcard_match("*", "anything"));
3554        assert!(wildcard_match("exact", "exact"));
3555
3556        assert!(!wildcard_match("release-*", "rev-2024"));
3557        assert!(!wildcard_match("*-stable", "v1-beta"));
3558        assert!(!wildcard_match("exact", "exactly"));
3559        assert!(!wildcard_match("a*b*c", "a-b"));
3560    }
3561
3562    // Suppress clippy::no_effect for BTreeMap usage anchor in this mod.
3563    #[allow(dead_code)]
3564    fn _anchor_btree() -> BTreeMap<String, String> {
3565        BTreeMap::new()
3566    }
3567}