1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19 EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20 Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24 "CreateRepository",
25 "DeleteRepository",
26 "DescribeRepositories",
27 "PutImageTagMutability",
28 "PutImageScanningConfiguration",
29 "SetRepositoryPolicy",
30 "GetRepositoryPolicy",
31 "DeleteRepositoryPolicy",
32 "TagResource",
33 "UntagResource",
34 "ListTagsForResource",
35 "PutImage",
36 "BatchGetImage",
37 "BatchDeleteImage",
38 "BatchCheckLayerAvailability",
39 "DescribeImages",
40 "ListImages",
41 "GetDownloadUrlForLayer",
42 "InitiateLayerUpload",
43 "UploadLayerPart",
44 "CompleteLayerUpload",
45 "GetAuthorizationToken",
46 "PutLifecyclePolicy",
47 "GetLifecyclePolicy",
48 "DeleteLifecyclePolicy",
49 "StartLifecyclePolicyPreview",
50 "GetLifecyclePolicyPreview",
51 "StartImageScan",
52 "DescribeImageScanFindings",
53 "DescribeRegistry",
54 "GetRegistryPolicy",
55 "PutRegistryPolicy",
56 "DeleteRegistryPolicy",
57 "GetRegistryScanningConfiguration",
58 "PutRegistryScanningConfiguration",
59 "BatchGetRepositoryScanningConfiguration",
60 "PutReplicationConfiguration",
61 "DescribeImageReplicationStatus",
62 "CreatePullThroughCacheRule",
63 "DeletePullThroughCacheRule",
64 "DescribePullThroughCacheRules",
65 "UpdatePullThroughCacheRule",
66 "ValidatePullThroughCacheRule",
67 "GetAccountSetting",
68 "PutAccountSetting",
69 "CreateRepositoryCreationTemplate",
70 "DeleteRepositoryCreationTemplate",
71 "DescribeRepositoryCreationTemplates",
72 "UpdateRepositoryCreationTemplate",
73 "GetSigningConfiguration",
74 "PutSigningConfiguration",
75 "DeleteSigningConfiguration",
76 "DescribeImageSigningStatus",
77 "RegisterPullTimeUpdateExclusion",
78 "DeregisterPullTimeUpdateExclusion",
79 "ListPullTimeUpdateExclusions",
80 "ListImageReferrers",
81 "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85 state: SharedEcrState,
86 snapshot_store: Option<Arc<dyn SnapshotStore>>,
87 snapshot_lock: Arc<AsyncMutex<()>>,
88 kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95impl EcrService {
96 pub fn new(state: SharedEcrState) -> Self {
97 Self {
98 state,
99 snapshot_store: None,
100 snapshot_lock: Arc::new(AsyncMutex::new(())),
101 kms_state: None,
102 }
103 }
104
105 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
106 self.snapshot_store = Some(store);
107 self
108 }
109
110 pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
111 self.kms_state = Some(kms);
112 self
113 }
114
115 pub fn state_handle(&self) -> &SharedEcrState {
120 &self.state
121 }
122
123 pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
126 self.kms_state.as_ref()
127 }
128
129 async fn save_snapshot(&self) {
130 Self::save_snapshot_with(
131 self.state.clone(),
132 self.snapshot_store.clone(),
133 self.snapshot_lock.clone(),
134 )
135 .await
136 }
137
138 pub(crate) async fn save_snapshot_with(
142 state: SharedEcrState,
143 store: Option<Arc<dyn SnapshotStore>>,
144 lock: Arc<AsyncMutex<()>>,
145 ) {
146 let Some(store) = store else {
147 return;
148 };
149 let _guard = lock.lock().await;
150 let snapshot = EcrSnapshot {
151 schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
152 accounts: Some(state.read().clone()),
153 };
154 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
155 let bytes = serde_json::to_vec(&snapshot)
156 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
157 store.save(&bytes)
158 })
159 .await;
160 match join {
161 Ok(Ok(())) => {}
162 Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
163 Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
164 }
165 }
166}
167
168#[async_trait]
169impl AwsService for EcrService {
170 fn service_name(&self) -> &str {
171 "ecr"
172 }
173
174 async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
175 if request
184 .path_segments
185 .first()
186 .map(|s| s == "v2")
187 .unwrap_or(false)
188 {
189 let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
192 && request.path_segments.len() >= 5
193 && request.path_segments[request.path_segments.len() - 2] == "uploads";
194 if !is_blob_upload {
195 if let Some(stream) = request.take_body_stream() {
196 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
197 }
198 }
199 let result = crate::oci::dispatch(self, &request).await;
200 let is_pull_get = request.method == http::Method::GET
206 && request.path_segments.len() >= 3
207 && matches!(
208 request.path_segments[request.path_segments.len() - 2].as_str(),
209 "blobs" | "manifests"
210 );
211 let mutates_oci = is_pull_get
212 || matches!(
213 request.method,
214 http::Method::POST
215 | http::Method::PUT
216 | http::Method::PATCH
217 | http::Method::DELETE
218 );
219 if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
220 self.save_snapshot().await;
221 }
222 return result;
223 }
224
225 if let Some(stream) = request.take_body_stream() {
227 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
228 }
229
230 let mutates = is_mutating(request.action.as_str());
231 let result = match request.action.as_str() {
232 "CreateRepository" => self.create_repository(&request),
233 "DeleteRepository" => self.delete_repository(&request),
234 "DescribeRepositories" => self.describe_repositories(&request),
235 "PutImageTagMutability" => self.put_image_tag_mutability(&request),
236 "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
237 "SetRepositoryPolicy" => self.set_repository_policy(&request),
238 "GetRepositoryPolicy" => self.get_repository_policy(&request),
239 "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
240 "TagResource" => self.tag_resource(&request),
241 "UntagResource" => self.untag_resource(&request),
242 "ListTagsForResource" => self.list_tags_for_resource(&request),
243 "PutImage" => self.put_image(&request),
244 "BatchGetImage" => self.batch_get_image(&request),
245 "BatchDeleteImage" => self.batch_delete_image(&request),
246 "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
247 "DescribeImages" => self.describe_images(&request),
248 "ListImages" => self.list_images(&request),
249 "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
250 "InitiateLayerUpload" => self.initiate_layer_upload(&request),
251 "UploadLayerPart" => self.upload_layer_part(&request),
252 "CompleteLayerUpload" => self.complete_layer_upload(&request),
253 "GetAuthorizationToken" => self.get_authorization_token(&request),
254 "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
255 "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
256 "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
257 "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
258 "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
259 "StartImageScan" => self.start_image_scan(&request),
260 "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
261 "DescribeRegistry" => self.describe_registry(&request),
262 "GetRegistryPolicy" => self.get_registry_policy(&request),
263 "PutRegistryPolicy" => self.put_registry_policy(&request),
264 "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
265 "GetRegistryScanningConfiguration" => {
266 self.get_registry_scanning_configuration(&request)
267 }
268 "PutRegistryScanningConfiguration" => {
269 self.put_registry_scanning_configuration(&request)
270 }
271 "BatchGetRepositoryScanningConfiguration" => {
272 self.batch_get_repository_scanning_configuration(&request)
273 }
274 "PutReplicationConfiguration" => self.put_replication_configuration(&request),
275 "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
276 "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
277 "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
278 "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
279 "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
280 "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
281 "GetAccountSetting" => self.get_account_setting(&request),
282 "PutAccountSetting" => self.put_account_setting(&request),
283 "CreateRepositoryCreationTemplate" => {
284 self.create_repository_creation_template(&request)
285 }
286 "DeleteRepositoryCreationTemplate" => {
287 self.delete_repository_creation_template(&request)
288 }
289 "DescribeRepositoryCreationTemplates" => {
290 self.describe_repository_creation_templates(&request)
291 }
292 "UpdateRepositoryCreationTemplate" => {
293 self.update_repository_creation_template(&request)
294 }
295 "GetSigningConfiguration" => self.get_signing_configuration(&request),
296 "PutSigningConfiguration" => self.put_signing_configuration(&request),
297 "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
298 "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
299 "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
300 "DeregisterPullTimeUpdateExclusion" => {
301 self.deregister_pull_time_update_exclusion(&request)
302 }
303 "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
304 "ListImageReferrers" => self.list_image_referrers(&request),
305 "UpdateImageStorageClass" => self.update_image_storage_class(&request),
306 _ => Err(AwsServiceError::action_not_implemented(
307 self.service_name(),
308 &request.action,
309 )),
310 };
311 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
312 self.save_snapshot().await;
313 }
314 result
315 }
316
317 fn supported_actions(&self) -> &[&str] {
318 SUPPORTED_ACTIONS
319 }
320}
321
322impl EcrService {
327 fn create_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
328 let body = request.json_body();
329 let name = req_str(&body, "repositoryName")?.to_string();
330 validate_repository_name(&name)?;
331 let image_tag_mutability = opt_str(&body, "imageTagMutability")
332 .unwrap_or("MUTABLE")
333 .to_string();
334 if image_tag_mutability != "MUTABLE" && image_tag_mutability != "IMMUTABLE" {
335 return Err(invalid_parameter(format!(
336 "Invalid value for imageTagMutability: {image_tag_mutability}"
337 )));
338 }
339 let scan_on_push = body
340 .get("imageScanningConfiguration")
341 .and_then(|v| v.get("scanOnPush"))
342 .and_then(|v| v.as_bool())
343 .unwrap_or(false);
344 let encryption = body
345 .get("encryptionConfiguration")
346 .map(|v| EncryptionConfiguration {
347 encryption_type: v
348 .get("encryptionType")
349 .and_then(|x| x.as_str())
350 .unwrap_or("AES256")
351 .to_string(),
352 kms_key: v
353 .get("kmsKey")
354 .and_then(|x| x.as_str())
355 .map(|s| s.to_string()),
356 })
357 .unwrap_or_default();
358 let tags = parse_tags(&body);
359
360 let account = target_account_id(request, &body);
361 let mut accounts = self.state.write();
362 let endpoint = accounts.endpoint().to_string();
363 let state = accounts.get_or_create(&account);
364 if state.repositories.contains_key(&name) {
365 return Err(repository_already_exists(&name));
366 }
367 let arn = state.repository_arn(&name);
368 let mut repo = Repository::new(&name, arn, state.registry_id(), &endpoint);
369 repo.image_tag_mutability = image_tag_mutability;
370 repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
371 repo.encryption_configuration = encryption;
372 for (k, v) in tags {
373 repo.tags.insert(k, v);
374 }
375 let response = repository_to_json(&repo);
376 state.repositories.insert(name.clone(), repo);
377 Ok(AwsResponse::ok_json(json!({ "repository": response })))
378 }
379
380 fn delete_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
381 let body = request.json_body();
382 let name = req_str(&body, "repositoryName")?.to_string();
383 let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
384 let account = target_account_id(request, &body);
385
386 let mut accounts = self.state.write();
387 let state = accounts
388 .get_mut(&account)
389 .ok_or_else(|| repository_not_found(&name))?;
390 let repo = state
391 .repositories
392 .get(&name)
393 .ok_or_else(|| repository_not_found(&name))?;
394 let _ = force;
397 let snapshot = repository_to_json(repo);
398 state.repositories.remove(&name);
399 Ok(AwsResponse::ok_json(json!({ "repository": snapshot })))
400 }
401
402 fn describe_repositories(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
403 const DEFAULT_PAGE_SIZE: usize = 100;
405 let body = request.json_body();
406 let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
407 Some(n) => {
408 if !(1..=1000).contains(&n) {
410 return Err(invalid_parameter(format!(
411 "Value '{n}' at 'maxResults' failed to satisfy constraint: \
412 Member must have value between 1 and 1000",
413 )));
414 }
415 n as usize
416 }
417 None => DEFAULT_PAGE_SIZE,
418 };
419 let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
420 Some(raw) => raw.parse::<usize>().map_err(|_| {
421 AwsServiceError::aws_error(
422 StatusCode::BAD_REQUEST,
423 "InvalidContinuationTokenException",
424 "The specified continuation token is not valid.",
425 )
426 })?,
427 None => 0,
428 };
429 let names: Vec<String> = body
430 .get("repositoryNames")
431 .and_then(|v| v.as_array())
432 .map(|arr| {
433 arr.iter()
434 .filter_map(|v| v.as_str().map(str::to_string))
435 .collect()
436 })
437 .unwrap_or_default();
438 let account = target_account_id(request, &body);
439 let accounts = self.state.read();
440 let Some(state) = accounts.get(&account) else {
441 return Ok(AwsResponse::ok_json(json!({ "repositories": [] })));
442 };
443 let mut out: Vec<Value> = Vec::new();
444 let mut next_token: Option<String> = None;
445 if names.is_empty() {
446 let all: Vec<&Repository> = state.repositories.values().collect();
447 let start = offset.min(all.len());
448 let end = (start + max_results).min(all.len());
449 for repo in &all[start..end] {
450 out.push(repository_to_json(repo));
451 }
452 if end < all.len() {
453 next_token = Some(end.to_string());
454 }
455 } else {
456 for n in &names {
457 let repo = state
458 .repositories
459 .get(n)
460 .ok_or_else(|| repository_not_found(n))?;
461 out.push(repository_to_json(repo));
462 }
463 }
464 let mut response = json!({ "repositories": out });
465 if let Some(token) = next_token {
466 response["nextToken"] = json!(token);
467 }
468 Ok(AwsResponse::ok_json(response))
469 }
470
471 fn put_image_tag_mutability(
472 &self,
473 request: &AwsRequest,
474 ) -> Result<AwsResponse, AwsServiceError> {
475 let body = request.json_body();
476 let name = req_str(&body, "repositoryName")?.to_string();
477 let mutability = req_str(&body, "imageTagMutability")?.to_string();
478 if mutability != "MUTABLE" && mutability != "IMMUTABLE" {
479 return Err(invalid_parameter(format!(
480 "Invalid value for imageTagMutability: {mutability}"
481 )));
482 }
483 let account = target_account_id(request, &body);
484 let mut accounts = self.state.write();
485 let state = accounts
486 .get_mut(&account)
487 .ok_or_else(|| repository_not_found(&name))?;
488 let repo = state
489 .repositories
490 .get_mut(&name)
491 .ok_or_else(|| repository_not_found(&name))?;
492 repo.image_tag_mutability = mutability.clone();
493 let registry_id = repo.registry_id.clone();
494 Ok(AwsResponse::ok_json(json!({
495 "registryId": registry_id,
496 "repositoryName": name,
497 "imageTagMutability": mutability,
498 })))
499 }
500
501 fn put_image_scanning_configuration(
502 &self,
503 request: &AwsRequest,
504 ) -> Result<AwsResponse, AwsServiceError> {
505 let body = request.json_body();
506 let name = req_str(&body, "repositoryName")?.to_string();
507 let scan_on_push = body
508 .get("imageScanningConfiguration")
509 .and_then(|v| v.get("scanOnPush"))
510 .and_then(|v| v.as_bool())
511 .ok_or_else(|| invalid_parameter("Missing imageScanningConfiguration.scanOnPush"))?;
512 let account = target_account_id(request, &body);
513 let mut accounts = self.state.write();
514 let state = accounts
515 .get_mut(&account)
516 .ok_or_else(|| repository_not_found(&name))?;
517 let repo = state
518 .repositories
519 .get_mut(&name)
520 .ok_or_else(|| repository_not_found(&name))?;
521 repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
522 let registry_id = repo.registry_id.clone();
523 Ok(AwsResponse::ok_json(json!({
524 "registryId": registry_id,
525 "repositoryName": name,
526 "imageScanningConfiguration": { "scanOnPush": scan_on_push },
527 })))
528 }
529
530 fn set_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
531 let body = request.json_body();
532 let name = req_str(&body, "repositoryName")?.to_string();
533 let policy_text = req_str(&body, "policyText")?.to_string();
534 let account = target_account_id(request, &body);
535 let mut accounts = self.state.write();
536 let state = accounts
537 .get_mut(&account)
538 .ok_or_else(|| repository_not_found(&name))?;
539 let repo = state
540 .repositories
541 .get_mut(&name)
542 .ok_or_else(|| repository_not_found(&name))?;
543 repo.policy = Some(policy_text.clone());
544 let registry_id = repo.registry_id.clone();
545 Ok(AwsResponse::ok_json(json!({
546 "registryId": registry_id,
547 "repositoryName": name,
548 "policyText": policy_text,
549 })))
550 }
551
552 fn get_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
553 let body = request.json_body();
554 let name = req_str(&body, "repositoryName")?.to_string();
555 let account = target_account_id(request, &body);
556 let accounts = self.state.read();
557 let state = accounts
558 .get(&account)
559 .ok_or_else(|| repository_not_found(&name))?;
560 let repo = state
561 .repositories
562 .get(&name)
563 .ok_or_else(|| repository_not_found(&name))?;
564 let policy = repo
565 .policy
566 .clone()
567 .ok_or_else(|| repository_policy_not_found(&name))?;
568 Ok(AwsResponse::ok_json(json!({
569 "registryId": repo.registry_id,
570 "repositoryName": name,
571 "policyText": policy,
572 })))
573 }
574
575 fn delete_repository_policy(
576 &self,
577 request: &AwsRequest,
578 ) -> Result<AwsResponse, AwsServiceError> {
579 let body = request.json_body();
580 let name = req_str(&body, "repositoryName")?.to_string();
581 let account = target_account_id(request, &body);
582 let mut accounts = self.state.write();
583 let state = accounts
584 .get_mut(&account)
585 .ok_or_else(|| repository_not_found(&name))?;
586 let repo = state
587 .repositories
588 .get_mut(&name)
589 .ok_or_else(|| repository_not_found(&name))?;
590 let policy = repo
591 .policy
592 .take()
593 .ok_or_else(|| repository_policy_not_found(&name))?;
594 let registry_id = repo.registry_id.clone();
595 Ok(AwsResponse::ok_json(json!({
596 "registryId": registry_id,
597 "repositoryName": name,
598 "policyText": policy,
599 })))
600 }
601
602 fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603 let body = request.json_body();
604 let arn = req_str(&body, "resourceArn")?.to_string();
605 let (arn_account, name) = decode_resource_arn(&arn)?;
606 let tags = parse_tags(&body);
607 let account = arn_account.unwrap_or_else(|| request.account_id.clone());
608 let mut accounts = self.state.write();
609 let state = accounts
610 .get_mut(&account)
611 .ok_or_else(|| repository_not_found(&name))?;
612 let repo = state
613 .repositories
614 .get_mut(&name)
615 .ok_or_else(|| repository_not_found(&name))?;
616 for (k, v) in tags {
617 repo.tags.insert(k, v);
618 }
619 Ok(AwsResponse::ok_json(json!({})))
620 }
621
622 fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
623 let body = request.json_body();
624 let arn = req_str(&body, "resourceArn")?.to_string();
625 let (arn_account, name) = decode_resource_arn(&arn)?;
626 let keys: Vec<String> = body
627 .get("tagKeys")
628 .and_then(|v| v.as_array())
629 .map(|arr| {
630 arr.iter()
631 .filter_map(|v| v.as_str().map(str::to_string))
632 .collect()
633 })
634 .unwrap_or_default();
635 let account = arn_account.unwrap_or_else(|| request.account_id.clone());
636 let mut accounts = self.state.write();
637 let state = accounts
638 .get_mut(&account)
639 .ok_or_else(|| repository_not_found(&name))?;
640 let repo = state
641 .repositories
642 .get_mut(&name)
643 .ok_or_else(|| repository_not_found(&name))?;
644 for k in keys {
645 repo.tags.remove(&k);
646 }
647 Ok(AwsResponse::ok_json(json!({})))
648 }
649
650 fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
651 let body = request.json_body();
652 let arn = req_str(&body, "resourceArn")?.to_string();
653 let (arn_account, name) = decode_resource_arn(&arn)?;
654 let account = arn_account.unwrap_or_else(|| request.account_id.clone());
655 let accounts = self.state.read();
656 let state = accounts
657 .get(&account)
658 .ok_or_else(|| repository_not_found(&name))?;
659 let repo = state
660 .repositories
661 .get(&name)
662 .ok_or_else(|| repository_not_found(&name))?;
663 let tags: Vec<Value> = repo
664 .tags
665 .iter()
666 .map(|(k, v)| json!({ "Key": k, "Value": v }))
667 .collect();
668 Ok(AwsResponse::ok_json(json!({ "tags": tags })))
669 }
670}
671
672impl EcrService {
677 fn put_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
678 let body = request.json_body();
679 let name = req_str(&body, "repositoryName")?.to_string();
680 let manifest = req_str(&body, "imageManifest")?.to_string();
681 let manifest_media_type = opt_str(&body, "imageManifestMediaType")
682 .unwrap_or("application/vnd.docker.distribution.manifest.v2+json")
683 .to_string();
684 let supplied_tag = opt_str(&body, "imageTag").map(|s| s.to_string());
685 let supplied_digest = opt_str(&body, "imageDigest").map(|s| s.to_string());
686 let account = target_account_id(request, &body);
687
688 let computed_digest = sha256_digest(manifest.as_bytes());
689 if let Some(ref supplied) = supplied_digest {
690 if supplied != &computed_digest {
691 return Err(AwsServiceError::aws_error(
692 StatusCode::BAD_REQUEST,
693 "ImageDigestDoesNotMatchException",
694 format!(
695 "The imageDigest '{supplied}' does not match the digest of the uploaded manifest ('{computed_digest}')."
696 ),
697 ));
698 }
699 }
700 let digest = supplied_digest.unwrap_or_else(|| computed_digest.clone());
701
702 let mut accounts = self.state.write();
703 let state = accounts
704 .get_mut(&account)
705 .ok_or_else(|| repository_not_found(&name))?;
706 let registry_match =
707 registry_scan_on_push_matches(&state.registry_scanning_configuration, &name);
708 let repo = state
709 .repositories
710 .get_mut(&name)
711 .ok_or_else(|| repository_not_found(&name))?;
712
713 check_repo_policy(
714 &account,
715 &request.account_id,
716 &repo.repository_arn,
717 &name,
718 repo.policy.as_deref(),
719 "ecr:PutImage",
720 )?;
721
722 if let Some(ref tag) = supplied_tag {
725 if let Some(existing) = repo.image_tags.get(tag) {
726 if existing != &digest && repo.image_tag_mutability == "IMMUTABLE" {
727 return Err(image_already_exists(&name, tag));
728 }
729 }
730 }
731
732 let image_entry = repo.images.entry(digest.clone()).or_insert_with(|| Image {
733 image_digest: digest.clone(),
734 image_manifest: manifest.clone(),
735 image_manifest_media_type: manifest_media_type.clone(),
736 artifact_media_type: None,
737 image_size_in_bytes: manifest.len() as u64,
738 image_pushed_at: Utc::now(),
739 last_recorded_pull_time: None,
740 image_status: "ACTIVE".to_string(),
741 last_archived_at: None,
742 last_activated_at: None,
743 last_in_use_at: None,
744 in_use_count: 0,
745 });
746 image_entry.image_manifest = manifest;
750 image_entry.image_manifest_media_type = manifest_media_type.clone();
751
752 if let Some(tag) = supplied_tag.clone() {
753 repo.image_tags.insert(tag, digest.clone());
754 }
755
756 let snapshot = repo.images.get(&digest).cloned().unwrap();
757 let scan_on_push = repo.image_scanning_configuration.scan_on_push;
758 let should_scan = scan_on_push || registry_match;
762 let tag_ref = supplied_tag.as_deref();
763 let response = AwsResponse::ok_json(json!({
764 "image": {
765 "registryId": repo.registry_id,
766 "repositoryName": name,
767 "imageId": image_id_for(&snapshot, tag_ref),
768 "imageManifest": snapshot.image_manifest,
769 "imageManifestMediaType": snapshot.image_manifest_media_type,
770 }
771 }));
772 drop(accounts);
775 if should_scan {
776 self.trigger_scan(&account, &name, &digest);
777 }
778 self.replicate_image(&account, &name, &digest);
779 Ok(response)
780 }
781
782 fn replicate_image(&self, source_account: &str, repo_name: &str, digest: &str) {
791 use crate::state::{ImageReplicationStatus, Repository};
792
793 let (rules, image, layer_blobs, source_registry_id, source_region, source_uri) = {
794 let accounts = self.state.read();
795 let Some(state) = accounts.get(source_account) else {
796 return;
797 };
798 let Some(cfg) = state.replication_configuration.as_ref() else {
799 return;
800 };
801 let Some(repo) = state.repositories.get(repo_name) else {
802 return;
803 };
804 let Some(image) = repo.images.get(digest).cloned() else {
805 return;
806 };
807 let layers: Vec<crate::state::Layer> = layers_for_image(repo, digest);
808 (
809 cfg.rules.clone(),
810 image,
811 layers,
812 repo.registry_id.clone(),
813 state.region.clone(),
814 repo.repository_uri.clone(),
815 )
816 };
817
818 let endpoint = source_uri
822 .strip_suffix(&format!("/{repo_name}"))
823 .unwrap_or(&source_uri)
824 .to_string();
825
826 let matching: Vec<_> = rules
827 .into_iter()
828 .filter(|rule| repository_filters_match(&rule.repository_filters, repo_name))
829 .flat_map(|rule| rule.destinations.into_iter())
830 .collect();
831 if matching.is_empty() {
832 return;
833 }
834
835 let mut statuses: Vec<ImageReplicationStatus> = matching
840 .iter()
841 .filter(|dest| {
842 !(dest.registry_id == source_registry_id && dest.region == source_region)
845 })
846 .map(|dest| ImageReplicationStatus {
847 region: dest.region.clone(),
848 registry_id: dest.registry_id.clone(),
849 status: "IN_PROGRESS".to_string(),
850 failure_code: None,
851 failure_reason: None,
852 })
853 .collect();
854
855 if statuses.is_empty() {
856 let mut accounts = self.state.write();
860 let source_state = accounts.get_or_create(source_account);
861 if let Some(repo) = source_state.repositories.get_mut(repo_name) {
862 repo.replication_statuses.remove(digest);
863 }
864 return;
865 }
866
867 let mut accounts = self.state.write();
868 for (idx, dest) in matching.iter().enumerate() {
869 if dest.registry_id == source_registry_id && dest.region == source_region {
870 continue;
871 }
872 let status_idx = matching
875 .iter()
876 .take(idx + 1)
877 .filter(|d| !(d.registry_id == source_registry_id && d.region == source_region))
878 .count()
879 - 1;
880
881 if dest.registry_id != source_registry_id {
887 let target_state = accounts.get_or_create(&dest.registry_id);
888 if !target_state.repositories.contains_key(repo_name) {
893 let arn = format!(
894 "arn:aws:ecr:{}:{}:repository/{}",
895 dest.region, dest.registry_id, repo_name
896 );
897 let repo = Repository::new(repo_name, arn, &dest.registry_id, &endpoint);
898 target_state
899 .repositories
900 .insert(repo_name.to_string(), repo);
901 }
902 let target_repo = target_state.repositories.get_mut(repo_name).unwrap();
903 target_repo
904 .images
905 .entry(digest.to_string())
906 .or_insert_with(|| image.clone());
907 for layer in &layer_blobs {
908 target_repo
909 .layers
910 .entry(layer.digest.clone())
911 .or_insert_with(|| layer.clone());
912 }
913 }
914 statuses[status_idx].status = "COMPLETE".to_string();
915 }
916
917 let source_state = accounts.get_or_create(source_account);
920 if let Some(repo) = source_state.repositories.get_mut(repo_name) {
921 repo.replication_statuses
922 .insert(digest.to_string(), statuses);
923 }
924 }
925
926 fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
931 use crate::state::ImageScanFindings;
932 let layers = {
933 let mut accounts = self.state.write();
934 let Some(state) = accounts.get_mut(account) else {
935 return;
936 };
937 let Some(repo) = state.repositories.get_mut(name) else {
938 return;
939 };
940 repo.scan_findings.insert(
941 digest.to_string(),
942 ImageScanFindings {
943 image_digest: digest.to_string(),
944 scan_status: "IN_PROGRESS".to_string(),
945 scan_completed_at: None,
946 vulnerability_source_updated_at: None,
947 finding_severity_counts: BTreeMap::new(),
948 findings: Vec::new(),
949 },
950 );
951 layers_for_image(repo, digest)
952 };
953 let shared = self.state.clone();
954 let store = self.snapshot_store.clone();
955 let snap_lock = self.snapshot_lock.clone();
956 let account = account.to_string();
957 let name = name.to_string();
958 let digest = digest.to_string();
959 tokio::spawn(async move {
960 let result = crate::scanner::scan_layers(&digest, &layers).await;
961 {
962 let mut accounts = shared.write();
963 let Some(state) = accounts.get_mut(&account) else {
964 return;
965 };
966 let Some(repo) = state.repositories.get_mut(&name) else {
967 return;
968 };
969 let findings = result.unwrap_or_else(|| ImageScanFindings {
970 image_digest: digest.clone(),
971 scan_status: "COMPLETE".to_string(),
972 scan_completed_at: Some(Utc::now()),
973 vulnerability_source_updated_at: Some(Utc::now()),
974 finding_severity_counts: BTreeMap::new(),
975 findings: Vec::new(),
976 });
977 repo.scan_findings.insert(digest.clone(), findings);
978 }
979 EcrService::save_snapshot_with(shared, store, snap_lock).await;
980 });
981 }
982
983 fn batch_get_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
984 let body = request.json_body();
985 let name = req_str(&body, "repositoryName")?.to_string();
986 let ids = body
987 .get("imageIds")
988 .and_then(|v| v.as_array())
989 .cloned()
990 .unwrap_or_default();
991 let account = target_account_id(request, &body);
992 let mut accounts = self.state.write();
997 let state = accounts
998 .get_mut(&account)
999 .ok_or_else(|| repository_not_found(&name))?;
1000 let exclusions = pull_time_exclusion_set(state);
1004 let repo = state
1005 .repositories
1006 .get_mut(&name)
1007 .ok_or_else(|| repository_not_found(&name))?;
1008
1009 check_repo_policy(
1010 &account,
1011 &request.account_id,
1012 &repo.repository_arn,
1013 &name,
1014 repo.policy.as_deref(),
1015 "ecr:BatchGetImage",
1016 )?;
1017
1018 let mut images: Vec<Value> = Vec::new();
1019 let mut failures: Vec<Value> = Vec::new();
1020 let mut hit_digests: Vec<String> = Vec::new();
1021 for id in &ids {
1022 match resolve_image_digest(repo, id) {
1023 Some(digest) => {
1024 let img = repo.images.get(&digest).unwrap();
1025 let tag = id.get("imageTag").and_then(|v| v.as_str());
1026 images.push(json!({
1027 "registryId": repo.registry_id,
1028 "repositoryName": name,
1029 "imageId": image_id_for(img, tag),
1030 "imageManifest": img.image_manifest,
1031 "imageManifestMediaType": img.image_manifest_media_type,
1032 }));
1033 hit_digests.push(digest);
1034 }
1035 None => failures.push(json!({
1036 "imageId": id,
1037 "failureCode": "ImageNotFound",
1038 "failureReason": "Requested image not found",
1039 })),
1040 }
1041 }
1042 let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
1043 touch_image_pull(repo, &hit_digests, caller_arn, &exclusions);
1044 Ok(AwsResponse::ok_json(json!({
1045 "images": images,
1046 "failures": failures,
1047 })))
1048 }
1049
1050 fn batch_delete_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1051 let body = request.json_body();
1052 let name = req_str(&body, "repositoryName")?.to_string();
1053 let ids = body
1054 .get("imageIds")
1055 .and_then(|v| v.as_array())
1056 .cloned()
1057 .unwrap_or_default();
1058 let account = target_account_id(request, &body);
1059 let mut accounts = self.state.write();
1060 let state = accounts
1061 .get_mut(&account)
1062 .ok_or_else(|| repository_not_found(&name))?;
1063 let repo = state
1064 .repositories
1065 .get_mut(&name)
1066 .ok_or_else(|| repository_not_found(&name))?;
1067
1068 check_repo_policy(
1069 &account,
1070 &request.account_id,
1071 &repo.repository_arn,
1072 &name,
1073 repo.policy.as_deref(),
1074 "ecr:BatchDeleteImage",
1075 )?;
1076
1077 let mut deleted: Vec<Value> = Vec::new();
1078 let mut failures: Vec<Value> = Vec::new();
1079 for id in &ids {
1080 if let Some(tag) = id.get("imageTag").and_then(|v| v.as_str()) {
1081 if let Some(digest) = repo.image_tags.remove(tag) {
1084 deleted.push(json!({ "imageDigest": digest, "imageTag": tag }));
1085 let still_tagged = repo.image_tags.values().any(|d| *d == digest);
1086 if !still_tagged {
1087 repo.images.remove(&digest);
1088 }
1089 continue;
1090 }
1091 failures.push(json!({
1092 "imageId": id,
1093 "failureCode": "ImageNotFound",
1094 "failureReason": "Requested image not found",
1095 }));
1096 } else if let Some(digest) = id.get("imageDigest").and_then(|v| v.as_str()) {
1097 if repo.images.remove(digest).is_some() {
1098 repo.image_tags.retain(|_, d| d != digest);
1099 deleted.push(json!({ "imageDigest": digest }));
1100 continue;
1101 }
1102 failures.push(json!({
1103 "imageId": id,
1104 "failureCode": "ImageNotFound",
1105 "failureReason": "Requested image not found",
1106 }));
1107 } else {
1108 failures.push(json!({
1109 "imageId": id,
1110 "failureCode": "InvalidImageTag",
1111 "failureReason": "Either imageDigest or imageTag must be supplied",
1112 }));
1113 }
1114 }
1115 Ok(AwsResponse::ok_json(json!({
1116 "imageIds": deleted,
1117 "failures": failures,
1118 })))
1119 }
1120
1121 fn batch_check_layer_availability(
1122 &self,
1123 request: &AwsRequest,
1124 ) -> Result<AwsResponse, AwsServiceError> {
1125 let body = request.json_body();
1126 let name = req_str(&body, "repositoryName")?.to_string();
1127 let digests: Vec<String> = body
1128 .get("layerDigests")
1129 .and_then(|v| v.as_array())
1130 .map(|arr| {
1131 arr.iter()
1132 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1133 .collect()
1134 })
1135 .unwrap_or_default();
1136 if digests.is_empty() {
1137 return Err(invalid_parameter(
1138 "At least one layerDigest must be supplied to BatchCheckLayerAvailability",
1139 ));
1140 }
1141 let account = target_account_id(request, &body);
1142 let accounts = self.state.read();
1143 let state = accounts
1144 .get(&account)
1145 .ok_or_else(|| repository_not_found(&name))?;
1146 let repo = state
1147 .repositories
1148 .get(&name)
1149 .ok_or_else(|| repository_not_found(&name))?;
1150 check_repo_policy(
1151 &account,
1152 &request.account_id,
1153 &repo.repository_arn,
1154 &name,
1155 repo.policy.as_deref(),
1156 "ecr:BatchCheckLayerAvailability",
1157 )?;
1158 let mut layers: Vec<Value> = Vec::new();
1159 let mut failures: Vec<Value> = Vec::new();
1160 for digest in &digests {
1161 match repo.layers.get(digest) {
1162 Some(layer) => layers.push(json!({
1163 "layerDigest": layer.digest,
1164 "layerAvailability": "AVAILABLE",
1165 "layerSize": layer.size,
1166 "mediaType": layer.media_type,
1167 })),
1168 None => failures.push(json!({
1169 "layerDigest": digest,
1170 "failureCode": "MissingLayerDigest",
1171 "failureReason": "Layer not found in repository",
1172 })),
1173 }
1174 }
1175 Ok(AwsResponse::ok_json(json!({
1176 "layers": layers,
1177 "failures": failures,
1178 })))
1179 }
1180
1181 fn describe_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1182 const DEFAULT_PAGE_SIZE: usize = 100;
1183 let body = request.json_body();
1184 let name = req_str(&body, "repositoryName")?.to_string();
1185 let ids = body
1186 .get("imageIds")
1187 .and_then(|v| v.as_array())
1188 .cloned()
1189 .unwrap_or_default();
1190 let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1191 Some(n) => {
1192 if !(1..=1000).contains(&n) {
1193 return Err(invalid_parameter(format!(
1194 "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1195 Member must have value between 1 and 1000",
1196 )));
1197 }
1198 n as usize
1199 }
1200 None => DEFAULT_PAGE_SIZE,
1201 };
1202 let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1203 Some(raw) => raw.parse::<usize>().map_err(|_| {
1204 AwsServiceError::aws_error(
1205 StatusCode::BAD_REQUEST,
1206 "InvalidContinuationTokenException",
1207 "The specified continuation token is not valid.",
1208 )
1209 })?,
1210 None => 0,
1211 };
1212 let account = target_account_id(request, &body);
1213 let accounts = self.state.read();
1214 let state = accounts
1215 .get(&account)
1216 .ok_or_else(|| repository_not_found(&name))?;
1217 let repo = state
1218 .repositories
1219 .get(&name)
1220 .ok_or_else(|| repository_not_found(&name))?;
1221
1222 let mut details: Vec<Value> = Vec::new();
1223 let mut next_token: Option<String> = None;
1224 if ids.is_empty() {
1225 let all: Vec<&Image> = repo.images.values().collect();
1226 let start = offset.min(all.len());
1227 let end = (start + max_results).min(all.len());
1228 for img in &all[start..end] {
1229 details.push(image_to_details(repo, img, &repo.registry_id));
1230 }
1231 if end < all.len() {
1232 next_token = Some(end.to_string());
1233 }
1234 } else {
1235 for id in &ids {
1236 let digest =
1237 resolve_image_digest(repo, id).ok_or_else(|| image_not_found(&name, id))?;
1238 let img = repo.images.get(&digest).unwrap();
1239 details.push(image_to_details(repo, img, &repo.registry_id));
1240 }
1241 }
1242 let mut response = json!({ "imageDetails": details });
1243 if let Some(token) = next_token {
1244 response["nextToken"] = json!(token);
1245 }
1246 Ok(AwsResponse::ok_json(response))
1247 }
1248
1249 fn list_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1250 const DEFAULT_PAGE_SIZE: usize = 100;
1251 let body = request.json_body();
1252 let name = req_str(&body, "repositoryName")?.to_string();
1253 let filter_tag_status = body
1254 .get("filter")
1255 .and_then(|v| v.get("tagStatus"))
1256 .and_then(|v| v.as_str())
1257 .map(|s| s.to_string());
1258 let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
1259 Some(n) => {
1260 if !(1..=1000).contains(&n) {
1261 return Err(invalid_parameter(format!(
1262 "Value '{n}' at 'maxResults' failed to satisfy constraint: \
1263 Member must have value between 1 and 1000",
1264 )));
1265 }
1266 n as usize
1267 }
1268 None => DEFAULT_PAGE_SIZE,
1269 };
1270 let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
1271 Some(raw) => raw.parse::<usize>().map_err(|_| {
1272 AwsServiceError::aws_error(
1273 StatusCode::BAD_REQUEST,
1274 "InvalidContinuationTokenException",
1275 "The specified continuation token is not valid.",
1276 )
1277 })?,
1278 None => 0,
1279 };
1280 let account = target_account_id(request, &body);
1281 let accounts = self.state.read();
1282 let state = accounts
1283 .get(&account)
1284 .ok_or_else(|| repository_not_found(&name))?;
1285 let repo = state
1286 .repositories
1287 .get(&name)
1288 .ok_or_else(|| repository_not_found(&name))?;
1289
1290 let mut all: Vec<(String, Option<String>)> = Vec::new();
1292 for (tag, digest) in &repo.image_tags {
1293 all.push((digest.clone(), Some(tag.clone())));
1294 }
1295 let tagged_digests: std::collections::HashSet<&String> = repo.image_tags.values().collect();
1296 for digest in repo.images.keys() {
1297 if !tagged_digests.contains(digest) {
1298 all.push((digest.clone(), None));
1299 }
1300 }
1301 all.retain(|(_, tag)| match filter_tag_status.as_deref() {
1303 Some("TAGGED") => tag.is_some(),
1304 Some("UNTAGGED") => tag.is_none(),
1305 _ => true,
1306 });
1307 all.sort();
1308
1309 let start = offset.min(all.len());
1310 let end = (start + max_results).min(all.len());
1311 let ids: Vec<Value> = all[start..end]
1312 .iter()
1313 .map(|(d, t)| {
1314 let mut v = json!({ "imageDigest": d });
1315 if let Some(tag) = t {
1316 v["imageTag"] = json!(tag);
1317 }
1318 v
1319 })
1320 .collect();
1321 let mut response = json!({ "imageIds": ids });
1322 if end < all.len() {
1323 response["nextToken"] = json!(end.to_string());
1324 }
1325 Ok(AwsResponse::ok_json(response))
1326 }
1327
1328 fn get_download_url_for_layer(
1329 &self,
1330 request: &AwsRequest,
1331 ) -> Result<AwsResponse, AwsServiceError> {
1332 let body = request.json_body();
1333 let name = req_str(&body, "repositoryName")?.to_string();
1334 let digest = req_str(&body, "layerDigest")?.to_string();
1335 let account = target_account_id(request, &body);
1336 let mut accounts = self.state.write();
1337 let state = accounts
1338 .get_mut(&account)
1339 .ok_or_else(|| repository_not_found(&name))?;
1340 let exclusions = pull_time_exclusion_set(state);
1341 let repo = state
1342 .repositories
1343 .get_mut(&name)
1344 .ok_or_else(|| repository_not_found(&name))?;
1345 check_repo_policy(
1346 &account,
1347 &request.account_id,
1348 &repo.repository_arn,
1349 &name,
1350 repo.policy.as_deref(),
1351 "ecr:GetDownloadUrlForLayer",
1352 )?;
1353 if !repo.layers.contains_key(&digest) {
1354 return Err(layer_not_found(&digest, &name));
1355 }
1356 let mut touched: Vec<String> = Vec::new();
1361 for (img_digest, img) in &repo.images {
1362 let parsed: Value = match serde_json::from_str(&img.image_manifest) {
1363 Ok(v) => v,
1364 Err(_) => continue,
1365 };
1366 let references = parsed
1367 .get("layers")
1368 .and_then(|v| v.as_array())
1369 .map(|arr| {
1370 arr.iter()
1371 .any(|l| l.get("digest").and_then(|d| d.as_str()) == Some(digest.as_str()))
1372 })
1373 .unwrap_or(false);
1374 if references {
1375 touched.push(img_digest.clone());
1376 }
1377 }
1378 let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
1379 touch_image_pull(repo, &touched, caller_arn, &exclusions);
1380 let endpoint = accounts.endpoint();
1384 let url = format!(
1385 "{}/v2/{}/blobs/{}",
1386 endpoint.trim_end_matches('/'),
1387 name,
1388 digest
1389 );
1390 Ok(AwsResponse::ok_json(json!({
1391 "downloadUrl": url,
1392 "layerDigest": digest,
1393 })))
1394 }
1395
1396 fn initiate_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1397 let body = request.json_body();
1398 let name = req_str(&body, "repositoryName")?.to_string();
1399 let account = target_account_id(request, &body);
1400 let mut accounts = self.state.write();
1401 let state = accounts
1402 .get_mut(&account)
1403 .ok_or_else(|| repository_not_found(&name))?;
1404 let repo = state
1405 .repositories
1406 .get(&name)
1407 .ok_or_else(|| repository_not_found(&name))?;
1408 check_repo_policy(
1409 &account,
1410 &request.account_id,
1411 &repo.repository_arn,
1412 &name,
1413 repo.policy.as_deref(),
1414 "ecr:InitiateLayerUpload",
1415 )?;
1416 let upload_id = Uuid::new_v4().to_string();
1417 let spool = crate::oci::create_upload_spool(&upload_id).map_err(|e| {
1418 AwsServiceError::aws_error(
1419 StatusCode::INTERNAL_SERVER_ERROR,
1420 "InternalError",
1421 format!("failed to create upload spool: {e}"),
1422 )
1423 })?;
1424 state.layer_uploads.insert(
1425 upload_id.clone(),
1426 LayerUpload {
1427 upload_id: upload_id.clone(),
1428 repository_name: name,
1429 created_at: Utc::now(),
1430 spool_path: spool.to_string_lossy().to_string(),
1431 last_byte_received: 0,
1432 },
1433 );
1434 Ok(AwsResponse::ok_json(json!({
1435 "uploadId": upload_id,
1436 "partSize": 10_485_760u64,
1438 })))
1439 }
1440
1441 fn upload_layer_part(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1442 let body = request.json_body();
1443 let name = req_str(&body, "repositoryName")?.to_string();
1444 let upload_id = req_str(&body, "uploadId")?.to_string();
1445 let first_byte = body
1446 .get("partFirstByte")
1447 .and_then(|v| v.as_u64())
1448 .ok_or_else(|| invalid_parameter("Missing partFirstByte"))?;
1449 let last_byte = body
1450 .get("partLastByte")
1451 .and_then(|v| v.as_u64())
1452 .ok_or_else(|| invalid_parameter("Missing partLastByte"))?;
1453 let part_blob_b64 = req_str(&body, "layerPartBlob")?.to_string();
1454 let part_bytes = B64
1455 .decode(part_blob_b64.as_bytes())
1456 .map_err(|_| invalid_layer("layerPartBlob is not valid base64"))?;
1457 let account = target_account_id(request, &body);
1458 let mut accounts = self.state.write();
1459 let state = accounts
1460 .get_mut(&account)
1461 .ok_or_else(|| repository_not_found(&name))?;
1462 let repo = state
1463 .repositories
1464 .get(&name)
1465 .ok_or_else(|| repository_not_found(&name))?;
1466 check_repo_policy(
1467 &account,
1468 &request.account_id,
1469 &repo.repository_arn,
1470 &name,
1471 repo.policy.as_deref(),
1472 "ecr:UploadLayerPart",
1473 )?;
1474 let upload = state
1475 .layer_uploads
1476 .get_mut(&upload_id)
1477 .ok_or_else(|| upload_not_found(&upload_id))?;
1478 if upload.repository_name != name {
1479 return Err(upload_not_found(&upload_id));
1480 }
1481 if first_byte != upload.last_byte_received {
1482 return Err(invalid_layer(format!(
1483 "Layer part upload out of order: expected partFirstByte {} got {}",
1484 upload.last_byte_received, first_byte,
1485 )));
1486 }
1487 let expected_len = last_byte
1488 .checked_sub(first_byte)
1489 .and_then(|d| d.checked_add(1))
1490 .ok_or_else(|| invalid_layer("partLastByte < partFirstByte"))?;
1491 if part_bytes.len() as u64 != expected_len {
1492 return Err(invalid_layer(format!(
1493 "Layer part size mismatch: bytes {} doesn't match range [{first_byte}, {last_byte}]",
1494 part_bytes.len()
1495 )));
1496 }
1497 let spool = std::path::PathBuf::from(&upload.spool_path);
1498 crate::oci::append_bytes_sync(&spool, &part_bytes).map_err(|e| {
1499 AwsServiceError::aws_error(
1500 StatusCode::INTERNAL_SERVER_ERROR,
1501 "InternalError",
1502 format!("failed to append upload chunk: {e}"),
1503 )
1504 })?;
1505 upload.last_byte_received = last_byte + 1;
1506 Ok(AwsResponse::ok_json(json!({
1507 "registryId": state.registry_id(),
1508 "repositoryName": name,
1509 "uploadId": upload_id,
1510 "lastByteReceived": last_byte,
1511 })))
1512 }
1513
1514 fn get_authorization_token(
1515 &self,
1516 request: &AwsRequest,
1517 ) -> Result<AwsResponse, AwsServiceError> {
1518 let body = request.json_body();
1519 let registry_ids: Vec<String> = body
1520 .get("registryIds")
1521 .and_then(|v| v.as_array())
1522 .map(|arr| {
1523 arr.iter()
1524 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1525 .collect()
1526 })
1527 .unwrap_or_default();
1528 let accounts = self.state.read();
1529 let default_account = accounts.default_account_id().to_string();
1530 let targets = if registry_ids.is_empty() {
1531 vec![default_account]
1532 } else {
1533 registry_ids
1534 };
1535 let endpoint = accounts.endpoint().to_string();
1536 drop(accounts);
1537 let expires_at = (Utc::now() + chrono::Duration::hours(12)).timestamp();
1538 let authorization_data: Vec<Value> = targets
1539 .into_iter()
1540 .map(|_registry_id| {
1541 let token = B64.encode(format!("AWS:{}", Uuid::new_v4()).as_bytes());
1542 json!({
1543 "authorizationToken": token,
1544 "expiresAt": expires_at,
1545 "proxyEndpoint": endpoint,
1546 })
1547 })
1548 .collect();
1549 Ok(AwsResponse::ok_json(json!({
1550 "authorizationData": authorization_data,
1551 })))
1552 }
1553
1554 fn complete_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1555 let body = request.json_body();
1556 let name = req_str(&body, "repositoryName")?.to_string();
1557 let upload_id = req_str(&body, "uploadId")?.to_string();
1558 let digests: Vec<String> = body
1559 .get("layerDigests")
1560 .and_then(|v| v.as_array())
1561 .map(|arr| {
1562 arr.iter()
1563 .filter_map(|v| v.as_str().map(|s| s.to_string()))
1564 .collect()
1565 })
1566 .unwrap_or_default();
1567 if digests.is_empty() {
1568 return Err(invalid_parameter(
1569 "At least one layerDigest must be supplied to CompleteLayerUpload",
1570 ));
1571 }
1572 let account = target_account_id(request, &body);
1573 let mut accounts = self.state.write();
1574 let state = accounts
1575 .get_mut(&account)
1576 .ok_or_else(|| repository_not_found(&name))?;
1577 let repo = state
1578 .repositories
1579 .get(&name)
1580 .ok_or_else(|| repository_not_found(&name))?;
1581 check_repo_policy(
1582 &account,
1583 &request.account_id,
1584 &repo.repository_arn,
1585 &name,
1586 repo.policy.as_deref(),
1587 "ecr:CompleteLayerUpload",
1588 )?;
1589 let upload = state
1593 .layer_uploads
1594 .get(&upload_id)
1595 .ok_or_else(|| upload_not_found(&upload_id))?;
1596 if upload.repository_name != name {
1597 return Err(upload_not_found(&upload_id));
1598 }
1599 let spool = std::path::PathBuf::from(&upload.spool_path);
1600 let blob_bytes = crate::oci::read_spool(&spool).map_err(|e| {
1601 AwsServiceError::aws_error(
1602 StatusCode::INTERNAL_SERVER_ERROR,
1603 "InternalError",
1604 format!("failed to read upload spool: {e}"),
1605 )
1606 })?;
1607 let computed = sha256_digest(&blob_bytes);
1608 if !digests.iter().any(|d| d == &computed) {
1609 return Err(AwsServiceError::aws_error(
1612 StatusCode::BAD_REQUEST,
1613 "LayerDigestMismatchException",
1614 format!(
1615 "The layer digest from the client ({}) does not match the digest of the received bytes ({computed})",
1616 digests.join(",")
1617 ),
1618 ));
1619 }
1620 let _upload = state.layer_uploads.remove(&upload_id).unwrap();
1621 crate::oci::unlink_spool(&spool);
1622 let size = blob_bytes.len() as u64;
1623 drop(accounts);
1626 let (stored_bytes, encrypted_with) =
1627 crate::oci::encrypt_layer_bytes(self, &account, &name, &blob_bytes);
1628 let mut accounts = self.state.write();
1629 let state = accounts
1630 .get_mut(&account)
1631 .ok_or_else(|| repository_not_found(&name))?;
1632 let repo = state
1633 .repositories
1634 .get_mut(&name)
1635 .ok_or_else(|| repository_not_found(&name))?;
1636 repo.layers.insert(
1637 computed.clone(),
1638 Layer {
1639 digest: computed.clone(),
1640 size,
1641 blob_b64: B64.encode(&stored_bytes),
1642 media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
1643 encrypted_with_kms_key: encrypted_with,
1644 },
1645 );
1646 let registry_id = repo.registry_id.clone();
1647 Ok(AwsResponse::ok_json(json!({
1648 "registryId": registry_id,
1649 "repositoryName": name,
1650 "uploadId": upload_id,
1651 "layerDigest": computed,
1652 })))
1653 }
1654}
1655
1656impl EcrService {
1659 fn put_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1660 let body = request.json_body();
1661 let name = req_str(&body, "repositoryName")?.to_string();
1662 let policy = req_str(&body, "lifecyclePolicyText")?.to_string();
1663 serde_json::from_str::<Value>(&policy)
1665 .map_err(|_| invalid_parameter("lifecyclePolicyText is not valid JSON"))?;
1666 let account = target_account_id(request, &body);
1667 let mut accounts = self.state.write();
1668 let state = accounts
1669 .get_mut(&account)
1670 .ok_or_else(|| repository_not_found(&name))?;
1671 let repo = state
1672 .repositories
1673 .get_mut(&name)
1674 .ok_or_else(|| repository_not_found(&name))?;
1675 repo.lifecycle_policy = Some(policy.clone());
1676 let prune = evaluate_lifecycle_policy(repo, &policy);
1678 for digest in &prune {
1679 repo.images.remove(digest);
1680 repo.image_tags.retain(|_, d| d != digest);
1681 }
1682 repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
1683 let registry_id = repo.registry_id.clone();
1684 Ok(AwsResponse::ok_json(json!({
1685 "registryId": registry_id,
1686 "repositoryName": name,
1687 "lifecyclePolicyText": policy,
1688 })))
1689 }
1690
1691 fn get_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1692 let body = request.json_body();
1693 let name = req_str(&body, "repositoryName")?.to_string();
1694 let account = target_account_id(request, &body);
1695 let accounts = self.state.read();
1696 let state = accounts
1697 .get(&account)
1698 .ok_or_else(|| repository_not_found(&name))?;
1699 let repo = state
1700 .repositories
1701 .get(&name)
1702 .ok_or_else(|| repository_not_found(&name))?;
1703 let policy = repo
1704 .lifecycle_policy
1705 .clone()
1706 .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1707 let last_eval = repo
1708 .lifecycle_policy_last_evaluated_at
1709 .map(|t| t.timestamp())
1710 .unwrap_or(0);
1711 Ok(AwsResponse::ok_json(json!({
1712 "registryId": repo.registry_id,
1713 "repositoryName": name,
1714 "lifecyclePolicyText": policy,
1715 "lastEvaluatedAt": last_eval,
1716 })))
1717 }
1718
1719 fn delete_lifecycle_policy(
1720 &self,
1721 request: &AwsRequest,
1722 ) -> Result<AwsResponse, AwsServiceError> {
1723 let body = request.json_body();
1724 let name = req_str(&body, "repositoryName")?.to_string();
1725 let account = target_account_id(request, &body);
1726 let mut accounts = self.state.write();
1727 let state = accounts
1728 .get_mut(&account)
1729 .ok_or_else(|| repository_not_found(&name))?;
1730 let repo = state
1731 .repositories
1732 .get_mut(&name)
1733 .ok_or_else(|| repository_not_found(&name))?;
1734 let policy = repo
1735 .lifecycle_policy
1736 .take()
1737 .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1738 let last_eval = repo
1739 .lifecycle_policy_last_evaluated_at
1740 .take()
1741 .map(|t| t.timestamp())
1742 .unwrap_or(0);
1743 let registry_id = repo.registry_id.clone();
1744 Ok(AwsResponse::ok_json(json!({
1745 "registryId": registry_id,
1746 "repositoryName": name,
1747 "lifecyclePolicyText": policy,
1748 "lastEvaluatedAt": last_eval,
1749 })))
1750 }
1751
1752 fn start_lifecycle_policy_preview(
1753 &self,
1754 request: &AwsRequest,
1755 ) -> Result<AwsResponse, AwsServiceError> {
1756 let body = request.json_body();
1757 let name = req_str(&body, "repositoryName")?.to_string();
1758 let account = target_account_id(request, &body);
1759 let policy = match opt_str(&body, "lifecyclePolicyText") {
1760 Some(s) => s.to_string(),
1761 None => {
1762 let accounts = self.state.read();
1763 let state = accounts
1764 .get(&account)
1765 .ok_or_else(|| repository_not_found(&name))?;
1766 let repo = state
1767 .repositories
1768 .get(&name)
1769 .ok_or_else(|| repository_not_found(&name))?;
1770 repo.lifecycle_policy
1771 .clone()
1772 .ok_or_else(|| lifecycle_policy_not_found(&name))?
1773 }
1774 };
1775 let mut accounts = self.state.write();
1780 let state = accounts
1781 .get_mut(&account)
1782 .ok_or_else(|| repository_not_found(&name))?;
1783 let repo = state
1784 .repositories
1785 .get_mut(&name)
1786 .ok_or_else(|| repository_not_found(&name))?;
1787 let _prune = evaluate_lifecycle_policy(repo, &policy);
1788 repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
1789 let registry_id = repo.registry_id.clone();
1790 Ok(AwsResponse::ok_json(json!({
1791 "registryId": registry_id,
1792 "repositoryName": name,
1793 "lifecyclePolicyText": policy,
1794 "status": "COMPLETE",
1795 })))
1796 }
1797
1798 fn get_lifecycle_policy_preview(
1799 &self,
1800 request: &AwsRequest,
1801 ) -> Result<AwsResponse, AwsServiceError> {
1802 let body = request.json_body();
1803 let name = req_str(&body, "repositoryName")?.to_string();
1804 let account = target_account_id(request, &body);
1805 let accounts = self.state.read();
1806 let state = accounts
1807 .get(&account)
1808 .ok_or_else(|| repository_not_found(&name))?;
1809 let repo = state
1810 .repositories
1811 .get(&name)
1812 .ok_or_else(|| repository_not_found(&name))?;
1813 let policy = repo
1814 .lifecycle_policy
1815 .clone()
1816 .ok_or_else(|| lifecycle_policy_not_found(&name))?;
1817 let prune = evaluate_lifecycle_policy(repo, &policy);
1818 let results: Vec<Value> = prune
1819 .iter()
1820 .map(|digest| {
1821 json!({
1822 "imageDigest": digest,
1823 "imagePushedAt": repo.images.get(digest).map(|i| i.image_pushed_at.timestamp()).unwrap_or(0),
1824 "action": {"type": "EXPIRE"},
1825 })
1826 })
1827 .collect();
1828 Ok(AwsResponse::ok_json(json!({
1829 "registryId": repo.registry_id,
1830 "repositoryName": name,
1831 "lifecyclePolicyText": policy,
1832 "status": "COMPLETE",
1833 "previewResults": results,
1834 "summary": {"expiringImageTotalCount": prune.len()},
1835 })))
1836 }
1837
1838 fn start_image_scan(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1839 use crate::state::ImageScanFindings;
1840 let body = request.json_body();
1841 let name = req_str(&body, "repositoryName")?.to_string();
1842 let image_id = body
1843 .get("imageId")
1844 .cloned()
1845 .ok_or_else(|| invalid_parameter("Missing imageId"))?;
1846 let account = target_account_id(request, &body);
1847 let (digest, layers, registry_id) = {
1848 let mut accounts = self.state.write();
1849 let state = accounts
1850 .get_mut(&account)
1851 .ok_or_else(|| repository_not_found(&name))?;
1852 let repo = state
1853 .repositories
1854 .get_mut(&name)
1855 .ok_or_else(|| repository_not_found(&name))?;
1856 let digest = resolve_image_digest(repo, &image_id)
1857 .ok_or_else(|| image_not_found(&name, &image_id))?;
1858 repo.scan_findings.insert(
1862 digest.clone(),
1863 ImageScanFindings {
1864 image_digest: digest.clone(),
1865 scan_status: "IN_PROGRESS".to_string(),
1866 scan_completed_at: None,
1867 vulnerability_source_updated_at: None,
1868 finding_severity_counts: BTreeMap::new(),
1869 findings: Vec::new(),
1870 },
1871 );
1872 let layers = layers_for_image(repo, &digest);
1878 (digest, layers, repo.registry_id.clone())
1879 };
1880
1881 let shared = self.state.clone();
1882 let store = self.snapshot_store.clone();
1883 let snap_lock = self.snapshot_lock.clone();
1884 let account_for_task = account.clone();
1885 let name_for_task = name.clone();
1886 let digest_for_task = digest.clone();
1887 tokio::spawn(async move {
1888 let result = crate::scanner::scan_layers(&digest_for_task, &layers).await;
1889 {
1890 let mut accounts = shared.write();
1891 let Some(state) = accounts.get_mut(&account_for_task) else {
1892 return;
1893 };
1894 let Some(repo) = state.repositories.get_mut(&name_for_task) else {
1895 return;
1896 };
1897 let findings = result.unwrap_or_else(|| ImageScanFindings {
1898 image_digest: digest_for_task.clone(),
1899 scan_status: "COMPLETE".to_string(),
1900 scan_completed_at: Some(Utc::now()),
1901 vulnerability_source_updated_at: Some(Utc::now()),
1902 finding_severity_counts: BTreeMap::new(),
1903 findings: Vec::new(),
1904 });
1905 repo.scan_findings.insert(digest_for_task.clone(), findings);
1906 }
1907 EcrService::save_snapshot_with(shared, store, snap_lock).await;
1911 });
1912
1913 Ok(AwsResponse::ok_json(json!({
1914 "registryId": registry_id,
1915 "repositoryName": name,
1916 "imageId": image_id,
1917 "imageScanStatus": {"status": "IN_PROGRESS"},
1918 })))
1919 }
1920
1921 fn describe_image_scan_findings(
1922 &self,
1923 request: &AwsRequest,
1924 ) -> Result<AwsResponse, AwsServiceError> {
1925 let body = request.json_body();
1926 let name = req_str(&body, "repositoryName")?.to_string();
1927 let image_id = body
1928 .get("imageId")
1929 .cloned()
1930 .ok_or_else(|| invalid_parameter("Missing imageId"))?;
1931 let account = target_account_id(request, &body);
1932 let accounts = self.state.read();
1933 let state = accounts
1934 .get(&account)
1935 .ok_or_else(|| repository_not_found(&name))?;
1936 let repo = state
1937 .repositories
1938 .get(&name)
1939 .ok_or_else(|| repository_not_found(&name))?;
1940 let digest = resolve_image_digest(repo, &image_id)
1941 .ok_or_else(|| image_not_found(&name, &image_id))?;
1942 let findings = repo.scan_findings.get(&digest).cloned().unwrap_or_else(|| {
1943 crate::state::ImageScanFindings {
1944 image_digest: digest.clone(),
1945 scan_status: "COMPLETE".to_string(),
1946 scan_completed_at: Some(Utc::now()),
1947 vulnerability_source_updated_at: Some(Utc::now()),
1948 finding_severity_counts: BTreeMap::new(),
1949 findings: Vec::new(),
1950 }
1951 });
1952 Ok(AwsResponse::ok_json(json!({
1953 "registryId": repo.registry_id,
1954 "repositoryName": name,
1955 "imageId": image_id,
1956 "imageScanStatus": {"status": findings.scan_status},
1957 "imageScanFindings": {
1958 "imageScanCompletedAt": findings.scan_completed_at.map(|t| t.timestamp()),
1959 "vulnerabilitySourceUpdatedAt": findings.vulnerability_source_updated_at.map(|t| t.timestamp()),
1960 "findings": findings.findings,
1961 "findingSeverityCounts": findings.finding_severity_counts,
1962 },
1963 })))
1964 }
1965
1966 fn describe_registry(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1967 let body = request.json_body();
1968 let account = target_account_id(request, &body);
1969 let accounts = self.state.read();
1970 let state = accounts.get(&account);
1971 let registry_id = state
1972 .map(|s| s.account_id.clone())
1973 .unwrap_or_else(|| account.clone());
1974 let rules = state
1975 .and_then(|s| s.replication_configuration.as_ref())
1976 .map(|cfg| {
1977 cfg.rules
1978 .iter()
1979 .map(|r| {
1980 json!({
1981 "destinations": r.destinations.iter().map(|d| json!({
1982 "region": d.region,
1983 "registryId": d.registry_id,
1984 })).collect::<Vec<_>>(),
1985 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
1986 "filter": f.filter,
1987 "filterType": f.filter_type,
1988 })).collect::<Vec<_>>(),
1989 })
1990 })
1991 .collect::<Vec<_>>()
1992 })
1993 .unwrap_or_default();
1994 Ok(AwsResponse::ok_json(json!({
1995 "registryId": registry_id,
1996 "replicationConfiguration": {"rules": rules},
1997 })))
1998 }
1999
2000 fn get_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2001 let body = request.json_body();
2002 let account = target_account_id(request, &body);
2003 let accounts = self.state.read();
2004 let state = accounts
2005 .get(&account)
2006 .ok_or_else(registry_policy_not_found)?;
2007 let policy = state
2008 .registry_policy
2009 .clone()
2010 .ok_or_else(registry_policy_not_found)?;
2011 Ok(AwsResponse::ok_json(json!({
2012 "registryId": state.account_id,
2013 "policyText": policy,
2014 })))
2015 }
2016
2017 fn put_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2018 let body = request.json_body();
2019 let policy = req_str(&body, "policyText")?.to_string();
2020 if policy.len() > 10_240 {
2021 return Err(invalid_parameter(format!(
2022 "Value at 'policyText' failed to satisfy constraint: \
2023 Member must have length less than or equal to 10240 (got {})",
2024 policy.len()
2025 )));
2026 }
2027 let account = target_account_id(request, &body);
2028 let mut accounts = self.state.write();
2029 let state = accounts.get_or_create(&account);
2030 state.registry_policy = Some(policy.clone());
2031 Ok(AwsResponse::ok_json(json!({
2032 "registryId": state.account_id,
2033 "policyText": policy,
2034 })))
2035 }
2036
2037 fn delete_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2038 let body = request.json_body();
2039 let account = target_account_id(request, &body);
2040 let mut accounts = self.state.write();
2041 let state = accounts
2042 .get_mut(&account)
2043 .ok_or_else(registry_policy_not_found)?;
2044 let policy = state
2045 .registry_policy
2046 .take()
2047 .ok_or_else(registry_policy_not_found)?;
2048 Ok(AwsResponse::ok_json(json!({
2049 "registryId": state.account_id,
2050 "policyText": policy,
2051 })))
2052 }
2053
2054 fn get_registry_scanning_configuration(
2055 &self,
2056 request: &AwsRequest,
2057 ) -> Result<AwsResponse, AwsServiceError> {
2058 let body = request.json_body();
2059 let account = target_account_id(request, &body);
2060 let accounts = self.state.read();
2061 let state = accounts.get(&account);
2062 let cfg = state
2063 .map(|s| s.registry_scanning_configuration.clone())
2064 .unwrap_or_default();
2065 let rules: Vec<Value> = cfg
2066 .rules
2067 .iter()
2068 .map(|r| {
2069 json!({
2070 "scanFrequency": r.scan_frequency,
2071 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2072 "filter": f.filter,
2073 "filterType": f.filter_type,
2074 })).collect::<Vec<_>>(),
2075 })
2076 })
2077 .collect();
2078 Ok(AwsResponse::ok_json(json!({
2079 "registryId": state.map(|s| s.account_id.clone()).unwrap_or(account),
2080 "scanningConfiguration": {
2081 "scanType": cfg.scan_type,
2082 "rules": rules,
2083 },
2084 })))
2085 }
2086
2087 fn put_registry_scanning_configuration(
2088 &self,
2089 request: &AwsRequest,
2090 ) -> Result<AwsResponse, AwsServiceError> {
2091 use crate::state::{RegistryScanningConfiguration, RegistryScanningRule, RepositoryFilter};
2092 let body = request.json_body();
2093 let scan_type = opt_str(&body, "scanType").unwrap_or("BASIC").to_string();
2094 if scan_type != "BASIC" && scan_type != "ENHANCED" {
2095 return Err(invalid_parameter(format!(
2096 "Invalid scanType '{scan_type}'. Must be BASIC or ENHANCED."
2097 )));
2098 }
2099 let rules = body
2100 .get("rules")
2101 .and_then(|v| v.as_array())
2102 .cloned()
2103 .unwrap_or_default();
2104 let parsed_rules: Vec<RegistryScanningRule> = rules
2105 .iter()
2106 .map(|r| RegistryScanningRule {
2107 scan_frequency: r
2108 .get("scanFrequency")
2109 .and_then(|v| v.as_str())
2110 .unwrap_or("SCAN_ON_PUSH")
2111 .to_string(),
2112 repository_filters: r
2113 .get("repositoryFilters")
2114 .and_then(|v| v.as_array())
2115 .map(|arr| {
2116 arr.iter()
2117 .map(|f| RepositoryFilter {
2118 filter: f
2119 .get("filter")
2120 .and_then(|v| v.as_str())
2121 .unwrap_or("")
2122 .to_string(),
2123 filter_type: f
2124 .get("filterType")
2125 .and_then(|v| v.as_str())
2126 .unwrap_or("WILDCARD")
2127 .to_string(),
2128 })
2129 .collect()
2130 })
2131 .unwrap_or_default(),
2132 })
2133 .collect();
2134 let account = target_account_id(request, &body);
2135 let mut accounts = self.state.write();
2136 let state = accounts.get_or_create(&account);
2137 state.registry_scanning_configuration = RegistryScanningConfiguration {
2138 scan_type: scan_type.clone(),
2139 rules: parsed_rules,
2140 };
2141 let cfg = state.registry_scanning_configuration.clone();
2142 Ok(AwsResponse::ok_json(json!({
2143 "registryScanningConfiguration": {
2144 "scanType": cfg.scan_type,
2145 "rules": cfg.rules.iter().map(|r| json!({
2146 "scanFrequency": r.scan_frequency,
2147 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2148 "filter": f.filter,
2149 "filterType": f.filter_type,
2150 })).collect::<Vec<_>>(),
2151 })).collect::<Vec<_>>(),
2152 },
2153 })))
2154 }
2155
2156 fn batch_get_repository_scanning_configuration(
2157 &self,
2158 request: &AwsRequest,
2159 ) -> Result<AwsResponse, AwsServiceError> {
2160 let body = request.json_body();
2161 let names: Vec<String> = body
2162 .get("repositoryNames")
2163 .and_then(|v| v.as_array())
2164 .ok_or_else(|| invalid_parameter("Missing required field: repositoryNames"))?
2165 .iter()
2166 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2167 .collect();
2168 let account = target_account_id(request, &body);
2169 let accounts = self.state.read();
2170 let state = accounts
2171 .get(&account)
2172 .ok_or_else(|| repository_not_found(&account))?;
2173 let mut scanning: Vec<Value> = Vec::new();
2174 let mut failures: Vec<Value> = Vec::new();
2175 for n in &names {
2176 match state.repositories.get(n) {
2177 Some(repo) => scanning.push(json!({
2178 "repositoryArn": repo.repository_arn,
2179 "repositoryName": n,
2180 "scanOnPush": repo.image_scanning_configuration.scan_on_push,
2181 "scanFrequency": "SCAN_ON_PUSH",
2182 "appliedScanFilters": [],
2183 })),
2184 None => failures.push(json!({
2185 "repositoryName": n,
2186 "failureCode": "REPOSITORY_NOT_FOUND",
2187 "failureReason": format!("Repository '{n}' not found"),
2188 })),
2189 }
2190 }
2191 Ok(AwsResponse::ok_json(json!({
2192 "scanningConfigurations": scanning,
2193 "failures": failures,
2194 })))
2195 }
2196
2197 fn put_replication_configuration(
2198 &self,
2199 request: &AwsRequest,
2200 ) -> Result<AwsResponse, AwsServiceError> {
2201 use crate::state::{
2202 ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
2203 };
2204 let body = request.json_body();
2205 let cfg_value = body
2206 .get("replicationConfiguration")
2207 .cloned()
2208 .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
2209 let rules_value = cfg_value
2210 .get("rules")
2211 .and_then(|v| v.as_array())
2212 .cloned()
2213 .unwrap_or_default();
2214 let rules: Vec<ReplicationRule> = rules_value
2215 .iter()
2216 .map(|r| ReplicationRule {
2217 destinations: r
2218 .get("destinations")
2219 .and_then(|v| v.as_array())
2220 .map(|arr| {
2221 arr.iter()
2222 .map(|d| ReplicationDestination {
2223 region: d
2224 .get("region")
2225 .and_then(|v| v.as_str())
2226 .unwrap_or("")
2227 .to_string(),
2228 registry_id: d
2229 .get("registryId")
2230 .and_then(|v| v.as_str())
2231 .unwrap_or("")
2232 .to_string(),
2233 })
2234 .collect()
2235 })
2236 .unwrap_or_default(),
2237 repository_filters: r
2238 .get("repositoryFilters")
2239 .and_then(|v| v.as_array())
2240 .map(|arr| {
2241 arr.iter()
2242 .map(|f| RepositoryFilter {
2243 filter: f
2244 .get("filter")
2245 .and_then(|v| v.as_str())
2246 .unwrap_or("")
2247 .to_string(),
2248 filter_type: f
2249 .get("filterType")
2250 .and_then(|v| v.as_str())
2251 .unwrap_or("PREFIX_MATCH")
2252 .to_string(),
2253 })
2254 .collect()
2255 })
2256 .unwrap_or_default(),
2257 })
2258 .collect();
2259 let account = target_account_id(request, &body);
2260 let mut accounts = self.state.write();
2261 let state = accounts.get_or_create(&account);
2262 state.replication_configuration = Some(ReplicationConfiguration { rules });
2263 let cfg = state.replication_configuration.clone().unwrap();
2264 Ok(AwsResponse::ok_json(json!({
2265 "replicationConfiguration": {
2266 "rules": cfg.rules.iter().map(|r| json!({
2267 "destinations": r.destinations.iter().map(|d| json!({
2268 "region": d.region,
2269 "registryId": d.registry_id,
2270 })).collect::<Vec<_>>(),
2271 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
2272 "filter": f.filter,
2273 "filterType": f.filter_type,
2274 })).collect::<Vec<_>>(),
2275 })).collect::<Vec<_>>(),
2276 },
2277 })))
2278 }
2279
2280 fn describe_image_replication_status(
2281 &self,
2282 request: &AwsRequest,
2283 ) -> Result<AwsResponse, AwsServiceError> {
2284 let body = request.json_body();
2285 let name = req_str(&body, "repositoryName")?.to_string();
2286 let image_id = body
2287 .get("imageId")
2288 .cloned()
2289 .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2290 let account = target_account_id(request, &body);
2291 let accounts = self.state.read();
2292 let state = accounts
2293 .get(&account)
2294 .ok_or_else(|| repository_not_found(&name))?;
2295 let repo = state
2296 .repositories
2297 .get(&name)
2298 .ok_or_else(|| repository_not_found(&name))?;
2299 let Some(digest) = resolve_image_digest(repo, &image_id) else {
2300 return Err(image_not_found(&name, &image_id));
2301 };
2302 let statuses: Vec<Value> = repo
2303 .replication_statuses
2304 .get(&digest)
2305 .map(|entries| {
2306 entries
2307 .iter()
2308 .map(|s| {
2309 let mut obj = serde_json::Map::new();
2310 obj.insert("region".into(), Value::String(s.region.clone()));
2311 obj.insert("registryId".into(), Value::String(s.registry_id.clone()));
2312 obj.insert("status".into(), Value::String(s.status.clone()));
2313 if let Some(ref code) = s.failure_code {
2314 obj.insert("failureCode".into(), Value::String(code.clone()));
2315 }
2316 if let Some(ref reason) = s.failure_reason {
2317 obj.insert("failureReason".into(), Value::String(reason.clone()));
2318 }
2319 Value::Object(obj)
2320 })
2321 .collect()
2322 })
2323 .unwrap_or_default();
2324 Ok(AwsResponse::ok_json(json!({
2325 "repositoryName": name,
2326 "imageId": image_id,
2327 "replicationStatuses": statuses,
2328 })))
2329 }
2330
2331 fn create_pull_through_cache_rule(
2332 &self,
2333 request: &AwsRequest,
2334 ) -> Result<AwsResponse, AwsServiceError> {
2335 use crate::state::PullThroughCacheRule;
2336 let body = request.json_body();
2337 let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2338 validate_pullthrough_prefix(&prefix)?;
2339 let upstream_url = req_str(&body, "upstreamRegistryUrl")?.to_string();
2340 let account = target_account_id(request, &body);
2341 let mut accounts = self.state.write();
2342 let state = accounts.get_or_create(&account);
2343 if state.pull_through_cache_rules.contains_key(&prefix) {
2344 return Err(AwsServiceError::aws_error(
2345 StatusCode::BAD_REQUEST,
2346 "PullThroughCacheRuleAlreadyExistsException",
2347 format!("A pull through cache rule with the prefix '{prefix}' already exists."),
2348 ));
2349 }
2350 let now = Utc::now();
2351 let rule = PullThroughCacheRule {
2352 ecr_repository_prefix: prefix.clone(),
2353 upstream_registry_url: upstream_url.clone(),
2354 upstream_registry: opt_str(&body, "upstreamRegistry").map(|s| s.to_string()),
2355 credential_arn: opt_str(&body, "credentialArn").map(|s| s.to_string()),
2356 created_at: now,
2357 updated_at: now,
2358 custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2359 };
2360 state
2361 .pull_through_cache_rules
2362 .insert(prefix.clone(), rule.clone());
2363 Ok(AwsResponse::ok_json(pull_through_rule_json(
2364 state.account_id.as_str(),
2365 &rule,
2366 )))
2367 }
2368
2369 fn delete_pull_through_cache_rule(
2370 &self,
2371 request: &AwsRequest,
2372 ) -> Result<AwsResponse, AwsServiceError> {
2373 let body = request.json_body();
2374 let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2375 validate_pullthrough_prefix(&prefix)?;
2376 let account = target_account_id(request, &body);
2377 let mut accounts = self.state.write();
2378 let state = accounts.get_or_create(&account);
2379 let removed = state
2380 .pull_through_cache_rules
2381 .remove(&prefix)
2382 .ok_or_else(|| {
2383 AwsServiceError::aws_error(
2384 StatusCode::BAD_REQUEST,
2385 "PullThroughCacheRuleNotFoundException",
2386 format!("No pull through cache rule with prefix '{prefix}' exists."),
2387 )
2388 })?;
2389 let mut response = pull_through_rule_json(state.account_id.as_str(), &removed);
2392 if let Value::Object(ref mut map) = response {
2393 map.remove("upstreamRegistry");
2394 }
2395 Ok(AwsResponse::ok_json(response))
2396 }
2397
2398 fn describe_pull_through_cache_rules(
2399 &self,
2400 request: &AwsRequest,
2401 ) -> Result<AwsResponse, AwsServiceError> {
2402 let body = request.json_body();
2403 validate_max_results(&body)?;
2404 let prefixes: Vec<String> = body
2405 .get("ecrRepositoryPrefixes")
2406 .and_then(|v| v.as_array())
2407 .map(|arr| {
2408 arr.iter()
2409 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2410 .collect()
2411 })
2412 .unwrap_or_default();
2413 let account = target_account_id(request, &body);
2414 let accounts = self.state.read();
2415 let state = accounts.get(&account);
2416 let rules: Vec<&crate::state::PullThroughCacheRule> = state
2417 .map(|s| s.pull_through_cache_rules.values().collect())
2418 .unwrap_or_default();
2419 let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2420 let filtered: Vec<Value> = rules
2421 .iter()
2422 .filter(|r| prefixes.is_empty() || prefixes.contains(&r.ecr_repository_prefix))
2423 .map(|r| pull_through_rule_json_with_updated(®istry_id, r))
2424 .collect();
2425 Ok(AwsResponse::ok_json(json!({
2426 "pullThroughCacheRules": filtered,
2427 })))
2428 }
2429
2430 fn update_pull_through_cache_rule(
2431 &self,
2432 request: &AwsRequest,
2433 ) -> Result<AwsResponse, AwsServiceError> {
2434 let body = request.json_body();
2435 let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2436 let account = target_account_id(request, &body);
2437 let mut accounts = self.state.write();
2438 let state = accounts.get_or_create(&account);
2439 let rule = state
2440 .pull_through_cache_rules
2441 .get_mut(&prefix)
2442 .ok_or_else(|| {
2443 AwsServiceError::aws_error(
2444 StatusCode::BAD_REQUEST,
2445 "PullThroughCacheRuleNotFoundException",
2446 format!("No pull through cache rule with prefix '{prefix}' exists."),
2447 )
2448 })?;
2449 if let Some(cred) = opt_str(&body, "credentialArn") {
2450 rule.credential_arn = Some(cred.to_string());
2451 }
2452 if let Some(role) = opt_str(&body, "customRoleArn") {
2453 rule.custom_role_arn = Some(role.to_string());
2454 }
2455 rule.updated_at = Utc::now();
2456 let response = pull_through_rule_json_with_updated(state.account_id.as_str(), rule);
2457 Ok(AwsResponse::ok_json(response))
2458 }
2459
2460 fn validate_pull_through_cache_rule(
2461 &self,
2462 request: &AwsRequest,
2463 ) -> Result<AwsResponse, AwsServiceError> {
2464 let body = request.json_body();
2465 let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
2466 let account = target_account_id(request, &body);
2467 let accounts = self.state.read();
2468 let state = accounts.get(&account);
2469 let rule = state
2470 .and_then(|s| s.pull_through_cache_rules.get(&prefix))
2471 .ok_or_else(|| {
2472 AwsServiceError::aws_error(
2473 StatusCode::BAD_REQUEST,
2474 "PullThroughCacheRuleNotFoundException",
2475 format!("No pull through cache rule with prefix '{prefix}' exists."),
2476 )
2477 })?;
2478 let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
2479 let mut base = pull_through_rule_json(®istry_id, rule);
2480 base["isValid"] = json!(true);
2481 Ok(AwsResponse::ok_json(base))
2482 }
2483
2484 fn get_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2485 let body = request.json_body();
2486 let name = req_str(&body, "name")?.to_string();
2487 validate_account_setting_name(&name)?;
2488 let account = target_account_id(request, &body);
2489 let accounts = self.state.read();
2490 let state = accounts.get(&account);
2491 let value = state
2492 .and_then(|s| s.account_settings.get(&name).cloned())
2493 .unwrap_or_else(|| "DISABLED".to_string());
2494 Ok(AwsResponse::ok_json(json!({
2495 "name": name,
2496 "value": value,
2497 })))
2498 }
2499
2500 fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2501 let body = request.json_body();
2502 let name = req_str(&body, "name")?.to_string();
2503 validate_account_setting_name(&name)?;
2504 let value = req_str(&body, "value")?.to_string();
2505 let account = target_account_id(request, &body);
2506 let mut accounts = self.state.write();
2507 let state = accounts.get_or_create(&account);
2508 state.account_settings.insert(name.clone(), value.clone());
2509 Ok(AwsResponse::ok_json(json!({
2510 "name": name,
2511 "value": value,
2512 })))
2513 }
2514
2515 fn create_repository_creation_template(
2516 &self,
2517 request: &AwsRequest,
2518 ) -> Result<AwsResponse, AwsServiceError> {
2519 use crate::state::{EncryptionConfiguration as Enc, RepositoryCreationTemplate};
2520 let body = request.json_body();
2521 let prefix = req_str(&body, "prefix")?.to_string();
2522 validate_template_prefix(&prefix)?;
2523 let applied_for: Vec<String> = body
2524 .get("appliedFor")
2525 .and_then(|v| v.as_array())
2526 .map(|arr| {
2527 arr.iter()
2528 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2529 .collect()
2530 })
2531 .unwrap_or_default();
2532 let image_tag_mutability = opt_str(&body, "imageTagMutability")
2533 .unwrap_or("MUTABLE")
2534 .to_string();
2535 let resource_tags = body
2536 .get("resourceTags")
2537 .and_then(|v| v.as_array())
2538 .cloned()
2539 .unwrap_or_default();
2540 let encryption = body.get("encryptionConfiguration").map(|v| Enc {
2541 encryption_type: v
2542 .get("encryptionType")
2543 .and_then(|x| x.as_str())
2544 .unwrap_or("AES256")
2545 .to_string(),
2546 kms_key: v
2547 .get("kmsKey")
2548 .and_then(|x| x.as_str())
2549 .map(|s| s.to_string()),
2550 });
2551 let account = target_account_id(request, &body);
2552 let mut accounts = self.state.write();
2553 let state = accounts.get_or_create(&account);
2554 if state.repository_creation_templates.contains_key(&prefix) {
2555 return Err(AwsServiceError::aws_error(
2556 StatusCode::BAD_REQUEST,
2557 "TemplateAlreadyExistsException",
2558 format!(
2559 "A repository creation template with the prefix '{prefix}' already exists."
2560 ),
2561 ));
2562 }
2563 let now = Utc::now();
2564 let tpl = RepositoryCreationTemplate {
2565 prefix: prefix.clone(),
2566 description: opt_str(&body, "description").map(|s| s.to_string()),
2567 image_tag_mutability,
2568 applied_for,
2569 resource_tags,
2570 created_at: now,
2571 updated_at: now,
2572 custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
2573 repository_policy: opt_str(&body, "repositoryPolicy").map(|s| s.to_string()),
2574 lifecycle_policy: opt_str(&body, "lifecyclePolicy").map(|s| s.to_string()),
2575 encryption_configuration: encryption,
2576 };
2577 state
2578 .repository_creation_templates
2579 .insert(prefix, tpl.clone());
2580 Ok(AwsResponse::ok_json(json!({
2581 "registryId": state.account_id,
2582 "repositoryCreationTemplate": template_to_json(&tpl),
2583 })))
2584 }
2585
2586 fn delete_repository_creation_template(
2587 &self,
2588 request: &AwsRequest,
2589 ) -> Result<AwsResponse, AwsServiceError> {
2590 let body = request.json_body();
2591 let prefix = req_str(&body, "prefix")?.to_string();
2592 validate_template_prefix(&prefix)?;
2593 let account = target_account_id(request, &body);
2594 let mut accounts = self.state.write();
2595 let state = accounts.get_or_create(&account);
2596 let removed = state
2597 .repository_creation_templates
2598 .remove(&prefix)
2599 .ok_or_else(|| {
2600 AwsServiceError::aws_error(
2601 StatusCode::BAD_REQUEST,
2602 "TemplateNotFoundException",
2603 format!("No repository creation template with prefix '{prefix}' exists."),
2604 )
2605 })?;
2606 Ok(AwsResponse::ok_json(json!({
2607 "registryId": state.account_id,
2608 "repositoryCreationTemplate": template_to_json(&removed),
2609 })))
2610 }
2611
2612 fn describe_repository_creation_templates(
2613 &self,
2614 request: &AwsRequest,
2615 ) -> Result<AwsResponse, AwsServiceError> {
2616 let body = request.json_body();
2617 validate_max_results(&body)?;
2618 let prefixes: Vec<String> = body
2619 .get("prefixes")
2620 .and_then(|v| v.as_array())
2621 .map(|arr| {
2622 arr.iter()
2623 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2624 .collect()
2625 })
2626 .unwrap_or_default();
2627 let account = target_account_id(request, &body);
2628 let accounts = self.state.read();
2629 let state = accounts.get(&account);
2630 let tpls: Vec<Value> = state
2631 .map(|s| {
2632 s.repository_creation_templates
2633 .values()
2634 .filter(|t| prefixes.is_empty() || prefixes.contains(&t.prefix))
2635 .map(template_to_json)
2636 .collect()
2637 })
2638 .unwrap_or_default();
2639 Ok(AwsResponse::ok_json(json!({
2640 "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2641 "repositoryCreationTemplates": tpls,
2642 })))
2643 }
2644
2645 fn update_repository_creation_template(
2646 &self,
2647 request: &AwsRequest,
2648 ) -> Result<AwsResponse, AwsServiceError> {
2649 let body = request.json_body();
2650 let prefix = req_str(&body, "prefix")?.to_string();
2651 validate_template_prefix(&prefix)?;
2652 let account = target_account_id(request, &body);
2653 let mut accounts = self.state.write();
2654 let state = accounts.get_or_create(&account);
2655 let tpl = state
2656 .repository_creation_templates
2657 .get_mut(&prefix)
2658 .ok_or_else(|| {
2659 AwsServiceError::aws_error(
2660 StatusCode::BAD_REQUEST,
2661 "TemplateNotFoundException",
2662 format!("No repository creation template with prefix '{prefix}' exists."),
2663 )
2664 })?;
2665 if let Some(desc) = opt_str(&body, "description") {
2666 tpl.description = Some(desc.to_string());
2667 }
2668 if let Some(mutability) = opt_str(&body, "imageTagMutability") {
2669 tpl.image_tag_mutability = mutability.to_string();
2670 }
2671 if let Some(arr) = body.get("appliedFor").and_then(|v| v.as_array()) {
2672 tpl.applied_for = arr
2673 .iter()
2674 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2675 .collect();
2676 }
2677 if let Some(arr) = body.get("resourceTags").and_then(|v| v.as_array()) {
2678 tpl.resource_tags = arr.clone();
2679 }
2680 tpl.updated_at = Utc::now();
2681 Ok(AwsResponse::ok_json(json!({
2682 "registryId": state.account_id,
2683 "repositoryCreationTemplate": template_to_json(tpl),
2684 })))
2685 }
2686
2687 fn get_signing_configuration(
2688 &self,
2689 request: &AwsRequest,
2690 ) -> Result<AwsResponse, AwsServiceError> {
2691 let body = request.json_body();
2692 let account = target_account_id(request, &body);
2693 let accounts = self.state.read();
2694 let state = accounts.get(&account);
2695 let rules: Vec<Value> = state
2696 .and_then(|s| s.signing_configuration.as_ref())
2697 .map(|c| c.rules.clone())
2698 .unwrap_or_default();
2699 Ok(AwsResponse::ok_json(json!({
2700 "registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
2701 "signingConfiguration": {"rules": rules},
2702 })))
2703 }
2704
2705 fn put_signing_configuration(
2706 &self,
2707 request: &AwsRequest,
2708 ) -> Result<AwsResponse, AwsServiceError> {
2709 use crate::signing::TrustedKey;
2710 use crate::state::SigningConfiguration;
2711 let body = request.json_body();
2712 let cfg = body
2713 .get("signingConfiguration")
2714 .ok_or_else(|| invalid_parameter("Missing required field: signingConfiguration"))?;
2715 let rules: Vec<Value> = cfg
2716 .get("rules")
2717 .and_then(|v| v.as_array())
2718 .cloned()
2719 .unwrap_or_default();
2720
2721 let mut trusted_keys: Vec<TrustedKey> = Vec::new();
2727 for rule in &rules {
2728 let keys = match rule.get("trustedKeys").and_then(|v| v.as_array()) {
2729 Some(k) => k,
2730 None => continue,
2731 };
2732 for k in keys {
2733 let key_id = k
2734 .get("keyId")
2735 .and_then(|v| v.as_str())
2736 .unwrap_or_default()
2737 .to_string();
2738 let pem = match k.get("pem").and_then(|v| v.as_str()) {
2739 Some(p) => p.to_string(),
2740 None => continue,
2741 };
2742 let algorithm = k
2743 .get("algorithm")
2744 .and_then(|v| v.as_str())
2745 .unwrap_or("ECDSA-P256")
2746 .to_string();
2747 if <p256::ecdsa::VerifyingKey as p256::pkcs8::DecodePublicKey>::from_public_key_pem(
2751 &pem,
2752 )
2753 .is_err()
2754 {
2755 return Err(invalid_parameter(format!(
2756 "trusted key {key_id} is not a valid ECDSA-P256 PEM-encoded public key"
2757 )));
2758 }
2759 trusted_keys.push(TrustedKey {
2760 key_id,
2761 pem,
2762 algorithm,
2763 });
2764 }
2765 }
2766
2767 let account = target_account_id(request, &body);
2768 let mut accounts = self.state.write();
2769 let state = accounts.get_or_create(&account);
2770 state.signing_configuration = Some(SigningConfiguration {
2771 rules: rules.clone(),
2772 trusted_keys,
2773 });
2774 Ok(AwsResponse::ok_json(json!({
2775 "signingConfiguration": {"rules": rules},
2776 })))
2777 }
2778
2779 fn delete_signing_configuration(
2780 &self,
2781 request: &AwsRequest,
2782 ) -> Result<AwsResponse, AwsServiceError> {
2783 let body = request.json_body();
2784 let account = target_account_id(request, &body);
2785 let mut accounts = self.state.write();
2786 let state = accounts.get_or_create(&account);
2787 state.signing_configuration = None;
2788 Ok(AwsResponse::ok_json(json!({})))
2789 }
2790
2791 fn describe_image_signing_status(
2792 &self,
2793 request: &AwsRequest,
2794 ) -> Result<AwsResponse, AwsServiceError> {
2795 let body = request.json_body();
2796 let name = req_str(&body, "repositoryName")?.to_string();
2797 let image_id = body
2798 .get("imageId")
2799 .cloned()
2800 .ok_or_else(|| invalid_parameter("Missing imageId"))?;
2801 let account = target_account_id(request, &body);
2802 let accounts = self.state.read();
2803 let state = accounts
2804 .get(&account)
2805 .ok_or_else(|| repository_not_found(&name))?;
2806 let repo = state
2807 .repositories
2808 .get(&name)
2809 .ok_or_else(|| repository_not_found(&name))?;
2810 let image_digest = resolve_image_digest(repo, &image_id)
2811 .ok_or_else(|| image_not_found(&name, &image_id))?;
2812
2813 let trusted_keys: &[crate::signing::TrustedKey] = state
2814 .signing_configuration
2815 .as_ref()
2816 .map(|c| c.trusted_keys.as_slice())
2817 .unwrap_or(&[]);
2818
2819 let sig_tag = match crate::signing::companion_sig_tag(&image_digest) {
2822 Some(t) => t,
2823 None => {
2824 return Ok(AwsResponse::ok_json(json!({
2825 "registryId": repo.registry_id,
2826 "repositoryName": name,
2827 "imageId": image_id,
2828 "imageSignatures": [],
2829 "signingStatus": "UNSIGNED",
2830 })));
2831 }
2832 };
2833 let sig_manifest_digest = match repo.image_tags.get(&sig_tag) {
2834 Some(d) => d,
2835 None => {
2836 return Ok(AwsResponse::ok_json(json!({
2837 "registryId": repo.registry_id,
2838 "repositoryName": name,
2839 "imageId": image_id,
2840 "imageSignatures": [],
2841 "signingStatus": "UNSIGNED",
2842 })));
2843 }
2844 };
2845 let sig_image = match repo.images.get(sig_manifest_digest) {
2846 Some(i) => i,
2847 None => {
2848 return Ok(AwsResponse::ok_json(json!({
2849 "registryId": repo.registry_id,
2850 "repositoryName": name,
2851 "imageId": image_id,
2852 "imageSignatures": [],
2853 "signingStatus": "UNSIGNED",
2854 })));
2855 }
2856 };
2857
2858 let manifest_json: Value = match serde_json::from_str(&sig_image.image_manifest) {
2859 Ok(v) => v,
2860 Err(_) => {
2861 return Ok(AwsResponse::ok_json(json!({
2862 "registryId": repo.registry_id,
2863 "repositoryName": name,
2864 "imageId": image_id,
2865 "imageSignatures": [],
2866 "signingStatus": "INVALID_SIGNATURE",
2867 })));
2868 }
2869 };
2870 let (layer_digest, signature_b64) =
2871 match crate::signing::extract_signature_annotation(&manifest_json) {
2872 Some(x) => x,
2873 None => {
2874 return Ok(AwsResponse::ok_json(json!({
2875 "registryId": repo.registry_id,
2876 "repositoryName": name,
2877 "imageId": image_id,
2878 "imageSignatures": [],
2879 "signingStatus": "UNSIGNED",
2880 })));
2881 }
2882 };
2883
2884 let payload_bytes: Vec<u8> = match repo.layers.get(&layer_digest) {
2886 Some(layer) => base64::Engine::decode(
2887 &base64::engine::general_purpose::STANDARD,
2888 layer.blob_b64.as_bytes(),
2889 )
2890 .unwrap_or_default(),
2891 None => {
2892 return Ok(AwsResponse::ok_json(json!({
2893 "registryId": repo.registry_id,
2894 "repositoryName": name,
2895 "imageId": image_id,
2896 "imageSignatures": [],
2897 "signingStatus": "UNSIGNED",
2898 })));
2899 }
2900 };
2901
2902 if let Some(named) = crate::signing::referenced_image_digest(&payload_bytes) {
2905 if named != image_digest {
2906 return Ok(AwsResponse::ok_json(json!({
2907 "registryId": repo.registry_id,
2908 "repositoryName": name,
2909 "imageId": image_id,
2910 "imageSignatures": [],
2911 "signingStatus": "INVALID_SIGNATURE",
2912 "statusReason": "signature payload references a different image digest",
2913 })));
2914 }
2915 }
2916
2917 let mut matched: Option<&crate::signing::TrustedKey> = None;
2919 for key in trusted_keys {
2920 if crate::signing::verify_cosign_signature(&key.pem, &payload_bytes, &signature_b64)
2921 .is_ok()
2922 {
2923 matched = Some(key);
2924 break;
2925 }
2926 }
2927
2928 let mut response = json!({
2929 "registryId": repo.registry_id,
2930 "repositoryName": name,
2931 "imageId": image_id,
2932 });
2933 if let Some(key) = matched {
2934 response["imageSignatures"] = json!([{
2935 "signatureFormat": "COSIGN",
2936 "keyId": key.key_id,
2937 "algorithm": key.algorithm,
2938 "valid": true,
2939 }]);
2940 response["signingStatus"] = json!("SIGNED");
2941 } else if trusted_keys.is_empty() {
2942 response["imageSignatures"] = json!([{
2946 "signatureFormat": "COSIGN",
2947 "valid": false,
2948 "statusReason": "no trusted keys configured"
2949 }]);
2950 response["signingStatus"] = json!("UNVERIFIED");
2951 } else {
2952 response["imageSignatures"] = json!([{
2953 "signatureFormat": "COSIGN",
2954 "valid": false,
2955 "statusReason": "signature did not match any trusted key"
2956 }]);
2957 response["signingStatus"] = json!("INVALID_SIGNATURE");
2958 }
2959 Ok(AwsResponse::ok_json(response))
2960 }
2961
2962 fn register_pull_time_update_exclusion(
2963 &self,
2964 request: &AwsRequest,
2965 ) -> Result<AwsResponse, AwsServiceError> {
2966 use crate::state::PullTimeExclusion;
2967 let body = request.json_body();
2968 let principal_arn = req_str(&body, "principalArn")?.to_string();
2969 validate_string_length("principalArn", &principal_arn, 0, 200)?;
2971 let account = target_account_id(request, &body);
2972 let mut accounts = self.state.write();
2973 let state = accounts.get_or_create(&account);
2974 state
2975 .pull_time_exclusions
2976 .entry(principal_arn.clone())
2977 .or_insert_with(|| PullTimeExclusion {
2978 principal_arn: principal_arn.clone(),
2979 registered_at: Utc::now(),
2980 });
2981 Ok(AwsResponse::ok_json(json!({
2982 "principalArn": principal_arn,
2983 })))
2984 }
2985
2986 fn deregister_pull_time_update_exclusion(
2987 &self,
2988 request: &AwsRequest,
2989 ) -> Result<AwsResponse, AwsServiceError> {
2990 let body = request.json_body();
2991 let principal_arn = req_str(&body, "principalArn")?.to_string();
2992 validate_string_length("principalArn", &principal_arn, 0, 200)?;
2993 let account = target_account_id(request, &body);
2994 let mut accounts = self.state.write();
2995 let state = accounts.get_or_create(&account);
2996 state.pull_time_exclusions.remove(&principal_arn);
2997 Ok(AwsResponse::ok_json(json!({
2998 "principalArn": principal_arn,
2999 })))
3000 }
3001
3002 fn list_pull_time_update_exclusions(
3003 &self,
3004 request: &AwsRequest,
3005 ) -> Result<AwsResponse, AwsServiceError> {
3006 let body = request.json_body();
3007 validate_max_results(&body)?;
3008 let account = target_account_id(request, &body);
3009 let accounts = self.state.read();
3010 let state = accounts.get(&account);
3011 let exclusions: Vec<Value> = state
3012 .map(|s| {
3013 s.pull_time_exclusions
3014 .values()
3015 .map(|e| {
3016 json!({
3017 "principalArn": e.principal_arn,
3018 "registeredAt": e.registered_at.timestamp(),
3019 })
3020 })
3021 .collect()
3022 })
3023 .unwrap_or_default();
3024 Ok(AwsResponse::ok_json(json!({
3025 "pullTimeUpdateExclusions": exclusions,
3026 })))
3027 }
3028
3029 fn list_image_referrers(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3030 let body = request.json_body();
3031 let name = req_str(&body, "repositoryName")?.to_string();
3032 let subject = body
3033 .get("subjectId")
3034 .cloned()
3035 .ok_or_else(|| invalid_parameter("Missing subjectId"))?;
3036 let digest = subject
3037 .get("imageDigest")
3038 .and_then(|v| v.as_str())
3039 .ok_or_else(|| invalid_parameter("subjectId.imageDigest is required"))?
3040 .to_string();
3041 let filter = body.get("filter").cloned().unwrap_or(Value::Null);
3044 let artifact_type_filter: Option<Vec<String>> = filter
3045 .get("artifactTypes")
3046 .and_then(|v| v.as_array())
3047 .map(|arr| {
3048 arr.iter()
3049 .filter_map(|v| v.as_str().map(str::to_string))
3050 .collect()
3051 });
3052 let artifact_status_filter: String = filter
3053 .get("artifactStatus")
3054 .and_then(|v| v.as_str())
3055 .unwrap_or("ACTIVE")
3056 .to_string();
3057 let account = target_account_id(request, &body);
3058 let accounts = self.state.read();
3059 let state = accounts
3060 .get(&account)
3061 .ok_or_else(|| repository_not_found(&name))?;
3062 let repo = state
3063 .repositories
3064 .get(&name)
3065 .ok_or_else(|| repository_not_found(&name))?;
3066 if !repo.images.contains_key(&digest) {
3067 return Err(AwsServiceError::aws_error(
3068 StatusCode::BAD_REQUEST,
3069 "ImageNotFoundException",
3070 format!("Subject image {digest} not found in repository '{name}'"),
3071 ));
3072 }
3073 let mut referrers: Vec<Value> = Vec::new();
3077 for image in repo.images.values() {
3078 let parsed: Value = match serde_json::from_str(&image.image_manifest) {
3079 Ok(v) => v,
3080 Err(_) => continue,
3081 };
3082 let subject_digest = parsed
3083 .get("subject")
3084 .and_then(|s| s.get("digest"))
3085 .and_then(|d| d.as_str());
3086 if subject_digest != Some(digest.as_str()) {
3087 continue;
3088 }
3089 let artifact_type = parsed
3092 .get("artifactType")
3093 .and_then(|v| v.as_str())
3094 .or_else(|| {
3095 parsed
3096 .get("config")
3097 .and_then(|c| c.get("mediaType"))
3098 .and_then(|v| v.as_str())
3099 })
3100 .map(str::to_string);
3101 if let Some(ref allowed) = artifact_type_filter {
3102 if !allowed.iter().any(|t| Some(t) == artifact_type.as_ref()) {
3103 continue;
3104 }
3105 }
3106 if artifact_status_filter != "ANY" && image.image_status != artifact_status_filter {
3109 continue;
3110 }
3111 let annotations = parsed
3113 .get("annotations")
3114 .cloned()
3115 .unwrap_or_else(|| json!({}));
3116 let mut referrer = json!({
3117 "digest": image.image_digest,
3118 "mediaType": image.image_manifest_media_type,
3119 "size": image.image_size_in_bytes,
3120 "annotations": annotations,
3121 "artifactStatus": image.image_status,
3122 });
3123 if let Some(t) = artifact_type {
3124 referrer["artifactType"] = json!(t);
3125 }
3126 referrers.push(referrer);
3127 }
3128 Ok(AwsResponse::ok_json(json!({
3129 "referrers": referrers,
3130 })))
3131 }
3132
3133 fn update_image_storage_class(
3134 &self,
3135 request: &AwsRequest,
3136 ) -> Result<AwsResponse, AwsServiceError> {
3137 let body = request.json_body();
3138 let name = req_str(&body, "repositoryName")?.to_string();
3139 let image_id = body
3140 .get("imageId")
3141 .cloned()
3142 .ok_or_else(|| invalid_parameter("Missing imageId"))?;
3143 let target_class = req_str(&body, "targetStorageClass")?.to_string();
3144 if target_class != "STANDARD" && target_class != "ARCHIVE" {
3147 return Err(invalid_parameter(format!(
3148 "Invalid targetStorageClass '{target_class}'. Must be STANDARD or ARCHIVE."
3149 )));
3150 }
3151 let account = target_account_id(request, &body);
3152 let mut accounts = self.state.write();
3153 let state = accounts
3154 .get_mut(&account)
3155 .ok_or_else(|| repository_not_found(&name))?;
3156 let repo = state
3157 .repositories
3158 .get_mut(&name)
3159 .ok_or_else(|| repository_not_found(&name))?;
3160 let digest = resolve_image_digest(repo, &image_id)
3161 .ok_or_else(|| image_not_found(&name, &image_id))?;
3162 let registry_id = repo.registry_id.clone();
3163 let now = Utc::now();
3164 let image = repo.images.get_mut(&digest).expect("digest resolves");
3165 let new_status = match target_class.as_str() {
3166 "ARCHIVE" => {
3167 image.last_archived_at = Some(now);
3168 "ARCHIVED"
3169 }
3170 _ => {
3172 image.last_activated_at = Some(now);
3173 "ACTIVE"
3174 }
3175 };
3176 image.image_status = new_status.to_string();
3177 Ok(AwsResponse::ok_json(json!({
3178 "registryId": registry_id,
3179 "repositoryName": name,
3180 "imageId": image_id,
3181 "imageStatus": new_status,
3182 })))
3183 }
3184}
3185
3186#[path = "service_helpers.rs"]
3187mod service_helpers;
3188pub use service_helpers::evaluate_lifecycle_policy;
3189pub(crate) use service_helpers::*;
3190
3191#[cfg(test)]
3192#[path = "service_tests.rs"]
3193mod tests;