use std::collections::BTreeMap;
use std::sync::Arc;
use async_trait::async_trait;
use base64::engine::general_purpose::STANDARD as B64;
use base64::Engine;
use chrono::Utc;
use http::StatusCode;
use serde_json::{json, Map, Value};
use sha2::{Digest, Sha256};
use tokio::sync::Mutex as AsyncMutex;
use uuid::Uuid;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
use fakecloud_core::validation::validate_string_length;
use fakecloud_persistence::SnapshotStore;
use crate::state::{
EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
};
const SUPPORTED_ACTIONS: &[&str] = &[
"CreateRepository",
"DeleteRepository",
"DescribeRepositories",
"PutImageTagMutability",
"PutImageScanningConfiguration",
"SetRepositoryPolicy",
"GetRepositoryPolicy",
"DeleteRepositoryPolicy",
"TagResource",
"UntagResource",
"ListTagsForResource",
"PutImage",
"BatchGetImage",
"BatchDeleteImage",
"BatchCheckLayerAvailability",
"DescribeImages",
"ListImages",
"GetDownloadUrlForLayer",
"InitiateLayerUpload",
"UploadLayerPart",
"CompleteLayerUpload",
"GetAuthorizationToken",
"PutLifecyclePolicy",
"GetLifecyclePolicy",
"DeleteLifecyclePolicy",
"StartLifecyclePolicyPreview",
"GetLifecyclePolicyPreview",
"StartImageScan",
"DescribeImageScanFindings",
"DescribeRegistry",
"GetRegistryPolicy",
"PutRegistryPolicy",
"DeleteRegistryPolicy",
"GetRegistryScanningConfiguration",
"PutRegistryScanningConfiguration",
"BatchGetRepositoryScanningConfiguration",
"PutReplicationConfiguration",
"DescribeImageReplicationStatus",
"CreatePullThroughCacheRule",
"DeletePullThroughCacheRule",
"DescribePullThroughCacheRules",
"UpdatePullThroughCacheRule",
"ValidatePullThroughCacheRule",
"GetAccountSetting",
"PutAccountSetting",
"CreateRepositoryCreationTemplate",
"DeleteRepositoryCreationTemplate",
"DescribeRepositoryCreationTemplates",
"UpdateRepositoryCreationTemplate",
"GetSigningConfiguration",
"PutSigningConfiguration",
"DeleteSigningConfiguration",
"DescribeImageSigningStatus",
"RegisterPullTimeUpdateExclusion",
"DeregisterPullTimeUpdateExclusion",
"ListPullTimeUpdateExclusions",
"ListImageReferrers",
"UpdateImageStorageClass",
];
pub struct EcrService {
state: SharedEcrState,
snapshot_store: Option<Arc<dyn SnapshotStore>>,
snapshot_lock: Arc<AsyncMutex<()>>,
kms_state: Option<fakecloud_kms::SharedKmsState>,
}
impl EcrService {
pub fn new(state: SharedEcrState) -> Self {
Self {
state,
snapshot_store: None,
snapshot_lock: Arc::new(AsyncMutex::new(())),
kms_state: None,
}
}
pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
self.snapshot_store = Some(store);
self
}
pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
self.kms_state = Some(kms);
self
}
pub fn state_handle(&self) -> &SharedEcrState {
&self.state
}
pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
self.kms_state.as_ref()
}
async fn save_snapshot(&self) {
Self::save_snapshot_with(
self.state.clone(),
self.snapshot_store.clone(),
self.snapshot_lock.clone(),
)
.await
}
pub(crate) async fn save_snapshot_with(
state: SharedEcrState,
store: Option<Arc<dyn SnapshotStore>>,
lock: Arc<AsyncMutex<()>>,
) {
let Some(store) = store else {
return;
};
let _guard = lock.lock().await;
let snapshot = EcrSnapshot {
schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
accounts: Some(state.read().clone()),
};
let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
let bytes = serde_json::to_vec(&snapshot)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
store.save(&bytes)
})
.await;
match join {
Ok(Ok(())) => {}
Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
}
}
}
#[async_trait]
impl AwsService for EcrService {
fn service_name(&self) -> &str {
"ecr"
}
async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
if request
.path_segments
.first()
.map(|s| s == "v2")
.unwrap_or(false)
{
let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
&& request.path_segments.len() >= 5
&& request.path_segments[request.path_segments.len() - 2] == "uploads";
if !is_blob_upload {
if let Some(stream) = request.take_body_stream() {
request.body = fakecloud_core::service::drain_request_stream(stream).await?;
}
}
let result = crate::oci::dispatch(self, &request).await;
let is_pull_get = request.method == http::Method::GET
&& request.path_segments.len() >= 3
&& matches!(
request.path_segments[request.path_segments.len() - 2].as_str(),
"blobs" | "manifests"
);
let mutates_oci = is_pull_get
|| matches!(
request.method,
http::Method::POST
| http::Method::PUT
| http::Method::PATCH
| http::Method::DELETE
);
if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
self.save_snapshot().await;
}
return result;
}
if let Some(stream) = request.take_body_stream() {
request.body = fakecloud_core::service::drain_request_stream(stream).await?;
}
let mutates = is_mutating(request.action.as_str());
let result = match request.action.as_str() {
"CreateRepository" => self.create_repository(&request),
"DeleteRepository" => self.delete_repository(&request),
"DescribeRepositories" => self.describe_repositories(&request),
"PutImageTagMutability" => self.put_image_tag_mutability(&request),
"PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
"SetRepositoryPolicy" => self.set_repository_policy(&request),
"GetRepositoryPolicy" => self.get_repository_policy(&request),
"DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
"TagResource" => self.tag_resource(&request),
"UntagResource" => self.untag_resource(&request),
"ListTagsForResource" => self.list_tags_for_resource(&request),
"PutImage" => self.put_image(&request),
"BatchGetImage" => self.batch_get_image(&request),
"BatchDeleteImage" => self.batch_delete_image(&request),
"BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
"DescribeImages" => self.describe_images(&request),
"ListImages" => self.list_images(&request),
"GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
"InitiateLayerUpload" => self.initiate_layer_upload(&request),
"UploadLayerPart" => self.upload_layer_part(&request),
"CompleteLayerUpload" => self.complete_layer_upload(&request),
"GetAuthorizationToken" => self.get_authorization_token(&request),
"PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
"GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
"DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
"StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
"GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
"StartImageScan" => self.start_image_scan(&request),
"DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
"DescribeRegistry" => self.describe_registry(&request),
"GetRegistryPolicy" => self.get_registry_policy(&request),
"PutRegistryPolicy" => self.put_registry_policy(&request),
"DeleteRegistryPolicy" => self.delete_registry_policy(&request),
"GetRegistryScanningConfiguration" => {
self.get_registry_scanning_configuration(&request)
}
"PutRegistryScanningConfiguration" => {
self.put_registry_scanning_configuration(&request)
}
"BatchGetRepositoryScanningConfiguration" => {
self.batch_get_repository_scanning_configuration(&request)
}
"PutReplicationConfiguration" => self.put_replication_configuration(&request),
"DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
"CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
"DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
"DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
"UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
"ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
"GetAccountSetting" => self.get_account_setting(&request),
"PutAccountSetting" => self.put_account_setting(&request),
"CreateRepositoryCreationTemplate" => {
self.create_repository_creation_template(&request)
}
"DeleteRepositoryCreationTemplate" => {
self.delete_repository_creation_template(&request)
}
"DescribeRepositoryCreationTemplates" => {
self.describe_repository_creation_templates(&request)
}
"UpdateRepositoryCreationTemplate" => {
self.update_repository_creation_template(&request)
}
"GetSigningConfiguration" => self.get_signing_configuration(&request),
"PutSigningConfiguration" => self.put_signing_configuration(&request),
"DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
"DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
"RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
"DeregisterPullTimeUpdateExclusion" => {
self.deregister_pull_time_update_exclusion(&request)
}
"ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
"ListImageReferrers" => self.list_image_referrers(&request),
"UpdateImageStorageClass" => self.update_image_storage_class(&request),
_ => Err(AwsServiceError::action_not_implemented(
self.service_name(),
&request.action,
)),
};
if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
self.save_snapshot().await;
}
result
}
fn supported_actions(&self) -> &[&str] {
SUPPORTED_ACTIONS
}
}
impl EcrService {
fn create_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
validate_repository_name(&name)?;
let image_tag_mutability = opt_str(&body, "imageTagMutability")
.unwrap_or("MUTABLE")
.to_string();
if image_tag_mutability != "MUTABLE" && image_tag_mutability != "IMMUTABLE" {
return Err(invalid_parameter(format!(
"Invalid value for imageTagMutability: {image_tag_mutability}"
)));
}
let scan_on_push = body
.get("imageScanningConfiguration")
.and_then(|v| v.get("scanOnPush"))
.and_then(|v| v.as_bool())
.unwrap_or(false);
let encryption = body
.get("encryptionConfiguration")
.map(|v| EncryptionConfiguration {
encryption_type: v
.get("encryptionType")
.and_then(|x| x.as_str())
.unwrap_or("AES256")
.to_string(),
kms_key: v
.get("kmsKey")
.and_then(|x| x.as_str())
.map(|s| s.to_string()),
})
.unwrap_or_default();
let tags = parse_tags(&body);
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let endpoint = accounts.endpoint().to_string();
let state = accounts.get_or_create(&account);
if state.repositories.contains_key(&name) {
return Err(repository_already_exists(&name));
}
let arn = state.repository_arn(&name);
let mut repo = Repository::new(&name, arn, state.registry_id(), &endpoint);
repo.image_tag_mutability = image_tag_mutability;
repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
repo.encryption_configuration = encryption;
for (k, v) in tags {
repo.tags.insert(k, v);
}
let response = repository_to_json(&repo);
state.repositories.insert(name.clone(), repo);
Ok(AwsResponse::ok_json(json!({ "repository": response })))
}
fn delete_repository(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let force = body.get("force").and_then(|v| v.as_bool()).unwrap_or(false);
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let _ = force;
let snapshot = repository_to_json(repo);
state.repositories.remove(&name);
Ok(AwsResponse::ok_json(json!({ "repository": snapshot })))
}
fn describe_repositories(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
const DEFAULT_PAGE_SIZE: usize = 100;
let body = request.json_body();
let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
Some(n) => {
if !(1..=1000).contains(&n) {
return Err(invalid_parameter(format!(
"Value '{n}' at 'maxResults' failed to satisfy constraint: \
Member must have value between 1 and 1000",
)));
}
n as usize
}
None => DEFAULT_PAGE_SIZE,
};
let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
Some(raw) => raw.parse::<usize>().map_err(|_| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
"The specified parameter is invalid: nextToken",
)
})?,
None => 0,
};
let names: Vec<String> = body
.get("repositoryNames")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let Some(state) = accounts.get(&account) else {
return Ok(AwsResponse::ok_json(json!({ "repositories": [] })));
};
let mut out: Vec<Value> = Vec::new();
let mut next_token: Option<String> = None;
if names.is_empty() {
let all: Vec<&Repository> = state.repositories.values().collect();
let start = offset.min(all.len());
let end = (start + max_results).min(all.len());
for repo in &all[start..end] {
out.push(repository_to_json(repo));
}
if end < all.len() {
next_token = Some(end.to_string());
}
} else {
for n in &names {
let repo = state
.repositories
.get(n)
.ok_or_else(|| repository_not_found(n))?;
out.push(repository_to_json(repo));
}
}
let mut response = json!({ "repositories": out });
if let Some(token) = next_token {
response["nextToken"] = json!(token);
}
Ok(AwsResponse::ok_json(response))
}
fn put_image_tag_mutability(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let mutability = req_str(&body, "imageTagMutability")?.to_string();
if mutability != "MUTABLE" && mutability != "IMMUTABLE" {
return Err(invalid_parameter(format!(
"Invalid value for imageTagMutability: {mutability}"
)));
}
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.image_tag_mutability = mutability.clone();
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"imageTagMutability": mutability,
})))
}
fn put_image_scanning_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let scan_on_push = body
.get("imageScanningConfiguration")
.and_then(|v| v.get("scanOnPush"))
.and_then(|v| v.as_bool())
.ok_or_else(|| invalid_parameter("Missing imageScanningConfiguration.scanOnPush"))?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.image_scanning_configuration = ImageScanningConfiguration { scan_on_push };
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"imageScanningConfiguration": { "scanOnPush": scan_on_push },
})))
}
fn set_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let policy_text = req_str(&body, "policyText")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.policy = Some(policy_text.clone());
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"policyText": policy_text,
})))
}
fn get_repository_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let policy = repo
.policy
.clone()
.ok_or_else(|| repository_policy_not_found(&name))?;
Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"policyText": policy,
})))
}
fn delete_repository_policy(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
let policy = repo
.policy
.take()
.ok_or_else(|| repository_policy_not_found(&name))?;
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"policyText": policy,
})))
}
fn tag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let (arn_account, name) = decode_resource_arn(&arn)?;
let tags = parse_tags(&body);
let account = arn_account.unwrap_or_else(|| request.account_id.clone());
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
for (k, v) in tags {
repo.tags.insert(k, v);
}
Ok(AwsResponse::ok_json(json!({})))
}
fn untag_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let (arn_account, name) = decode_resource_arn(&arn)?;
let keys: Vec<String> = body
.get("tagKeys")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
let account = arn_account.unwrap_or_else(|| request.account_id.clone());
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
for k in keys {
repo.tags.remove(&k);
}
Ok(AwsResponse::ok_json(json!({})))
}
fn list_tags_for_resource(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let arn = req_str(&body, "resourceArn")?.to_string();
let (arn_account, name) = decode_resource_arn(&arn)?;
let account = arn_account.unwrap_or_else(|| request.account_id.clone());
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let tags: Vec<Value> = repo
.tags
.iter()
.map(|(k, v)| json!({ "Key": k, "Value": v }))
.collect();
Ok(AwsResponse::ok_json(json!({ "tags": tags })))
}
}
impl EcrService {
fn put_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let manifest = req_str(&body, "imageManifest")?.to_string();
let manifest_media_type = opt_str(&body, "imageManifestMediaType")
.unwrap_or("application/vnd.docker.distribution.manifest.v2+json")
.to_string();
let supplied_tag = opt_str(&body, "imageTag").map(|s| s.to_string());
let supplied_digest = opt_str(&body, "imageDigest").map(|s| s.to_string());
let account = target_account_id(request, &body);
let computed_digest = sha256_digest(manifest.as_bytes());
if let Some(ref supplied) = supplied_digest {
if supplied != &computed_digest {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ImageDigestDoesNotMatchException",
format!(
"The imageDigest '{supplied}' does not match the digest of the uploaded manifest ('{computed_digest}')."
),
));
}
}
let digest = supplied_digest.unwrap_or_else(|| computed_digest.clone());
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let registry_match =
registry_scan_on_push_matches(&state.registry_scanning_configuration, &name);
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:PutImage",
)?;
if let Some(ref tag) = supplied_tag {
if let Some(existing) = repo.image_tags.get(tag) {
if existing != &digest && repo.image_tag_mutability == "IMMUTABLE" {
return Err(image_already_exists(&name, tag));
}
}
}
let image_entry = repo.images.entry(digest.clone()).or_insert_with(|| Image {
image_digest: digest.clone(),
image_manifest: manifest.clone(),
image_manifest_media_type: manifest_media_type.clone(),
artifact_media_type: None,
image_size_in_bytes: manifest.len() as u64,
image_pushed_at: Utc::now(),
last_recorded_pull_time: None,
image_status: "ACTIVE".to_string(),
last_archived_at: None,
last_activated_at: None,
last_in_use_at: None,
in_use_count: 0,
});
image_entry.image_manifest = manifest;
image_entry.image_manifest_media_type = manifest_media_type.clone();
if let Some(tag) = supplied_tag.clone() {
repo.image_tags.insert(tag, digest.clone());
}
let snapshot = repo.images.get(&digest).cloned().unwrap();
let scan_on_push = repo.image_scanning_configuration.scan_on_push;
let should_scan = scan_on_push || registry_match;
let tag_ref = supplied_tag.as_deref();
let response = AwsResponse::ok_json(json!({
"image": {
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id_for(&snapshot, tag_ref),
"imageManifest": snapshot.image_manifest,
"imageManifestMediaType": snapshot.image_manifest_media_type,
}
}));
drop(accounts);
if should_scan {
self.trigger_scan(&account, &name, &digest);
}
self.replicate_image(&account, &name, &digest);
Ok(response)
}
fn replicate_image(&self, source_account: &str, repo_name: &str, digest: &str) {
use crate::state::{ImageReplicationStatus, Repository};
let (rules, image, layer_blobs, source_registry_id, source_region, source_uri) = {
let accounts = self.state.read();
let Some(state) = accounts.get(source_account) else {
return;
};
let Some(cfg) = state.replication_configuration.as_ref() else {
return;
};
let Some(repo) = state.repositories.get(repo_name) else {
return;
};
let Some(image) = repo.images.get(digest).cloned() else {
return;
};
let layers: Vec<crate::state::Layer> = layers_for_image(repo, digest);
(
cfg.rules.clone(),
image,
layers,
repo.registry_id.clone(),
state.region.clone(),
repo.repository_uri.clone(),
)
};
let endpoint = source_uri
.strip_suffix(&format!("/{repo_name}"))
.unwrap_or(&source_uri)
.to_string();
let matching: Vec<_> = rules
.into_iter()
.filter(|rule| repository_filters_match(&rule.repository_filters, repo_name))
.flat_map(|rule| rule.destinations.into_iter())
.collect();
if matching.is_empty() {
return;
}
let mut statuses: Vec<ImageReplicationStatus> = matching
.iter()
.filter(|dest| {
!(dest.registry_id == source_registry_id && dest.region == source_region)
})
.map(|dest| ImageReplicationStatus {
region: dest.region.clone(),
registry_id: dest.registry_id.clone(),
status: "IN_PROGRESS".to_string(),
failure_code: None,
failure_reason: None,
})
.collect();
if statuses.is_empty() {
let mut accounts = self.state.write();
let source_state = accounts.get_or_create(source_account);
if let Some(repo) = source_state.repositories.get_mut(repo_name) {
repo.replication_statuses.remove(digest);
}
return;
}
let mut accounts = self.state.write();
for (idx, dest) in matching.iter().enumerate() {
if dest.registry_id == source_registry_id && dest.region == source_region {
continue;
}
let status_idx = matching
.iter()
.take(idx + 1)
.filter(|d| !(d.registry_id == source_registry_id && d.region == source_region))
.count()
- 1;
if dest.registry_id != source_registry_id {
let target_state = accounts.get_or_create(&dest.registry_id);
if !target_state.repositories.contains_key(repo_name) {
let arn = format!(
"arn:aws:ecr:{}:{}:repository/{}",
dest.region, dest.registry_id, repo_name
);
let repo = Repository::new(repo_name, arn, &dest.registry_id, &endpoint);
target_state
.repositories
.insert(repo_name.to_string(), repo);
}
let target_repo = target_state.repositories.get_mut(repo_name).unwrap();
target_repo
.images
.entry(digest.to_string())
.or_insert_with(|| image.clone());
for layer in &layer_blobs {
target_repo
.layers
.entry(layer.digest.clone())
.or_insert_with(|| layer.clone());
}
}
statuses[status_idx].status = "COMPLETE".to_string();
}
let source_state = accounts.get_or_create(source_account);
if let Some(repo) = source_state.repositories.get_mut(repo_name) {
repo.replication_statuses
.insert(digest.to_string(), statuses);
}
}
fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
use crate::state::ImageScanFindings;
let layers = {
let mut accounts = self.state.write();
let Some(state) = accounts.get_mut(account) else {
return;
};
let Some(repo) = state.repositories.get_mut(name) else {
return;
};
repo.scan_findings.insert(
digest.to_string(),
ImageScanFindings {
image_digest: digest.to_string(),
scan_status: "IN_PROGRESS".to_string(),
scan_completed_at: None,
vulnerability_source_updated_at: None,
finding_severity_counts: BTreeMap::new(),
findings: Vec::new(),
},
);
layers_for_image(repo, digest)
};
let shared = self.state.clone();
let store = self.snapshot_store.clone();
let snap_lock = self.snapshot_lock.clone();
let account = account.to_string();
let name = name.to_string();
let digest = digest.to_string();
tokio::spawn(async move {
let result = crate::scanner::scan_layers(&digest, &layers).await;
{
let mut accounts = shared.write();
let Some(state) = accounts.get_mut(&account) else {
return;
};
let Some(repo) = state.repositories.get_mut(&name) else {
return;
};
let findings = result.unwrap_or_else(|| ImageScanFindings {
image_digest: digest.clone(),
scan_status: "COMPLETE".to_string(),
scan_completed_at: Some(Utc::now()),
vulnerability_source_updated_at: Some(Utc::now()),
finding_severity_counts: BTreeMap::new(),
findings: Vec::new(),
});
repo.scan_findings.insert(digest.clone(), findings);
}
EcrService::save_snapshot_with(shared, store, snap_lock).await;
});
}
fn batch_get_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let ids = body
.get("imageIds")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let exclusions = pull_time_exclusion_set(state);
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:BatchGetImage",
)?;
let mut images: Vec<Value> = Vec::new();
let mut failures: Vec<Value> = Vec::new();
let mut hit_digests: Vec<String> = Vec::new();
for id in &ids {
match resolve_image_digest(repo, id) {
Some(digest) => {
let img = repo.images.get(&digest).unwrap();
let tag = id.get("imageTag").and_then(|v| v.as_str());
images.push(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id_for(img, tag),
"imageManifest": img.image_manifest,
"imageManifestMediaType": img.image_manifest_media_type,
}));
hit_digests.push(digest);
}
None => failures.push(json!({
"imageId": id,
"failureCode": "ImageNotFound",
"failureReason": "Requested image not found",
})),
}
}
let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
touch_image_pull(repo, &hit_digests, caller_arn, &exclusions);
Ok(AwsResponse::ok_json(json!({
"images": images,
"failures": failures,
})))
}
fn batch_delete_image(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let ids = body
.get("imageIds")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:BatchDeleteImage",
)?;
let mut deleted: Vec<Value> = Vec::new();
let mut failures: Vec<Value> = Vec::new();
for id in &ids {
if let Some(tag) = id.get("imageTag").and_then(|v| v.as_str()) {
if let Some(digest) = repo.image_tags.remove(tag) {
deleted.push(json!({ "imageDigest": digest, "imageTag": tag }));
let still_tagged = repo.image_tags.values().any(|d| *d == digest);
if !still_tagged {
repo.images.remove(&digest);
}
continue;
}
failures.push(json!({
"imageId": id,
"failureCode": "ImageNotFound",
"failureReason": "Requested image not found",
}));
} else if let Some(digest) = id.get("imageDigest").and_then(|v| v.as_str()) {
if repo.images.remove(digest).is_some() {
repo.image_tags.retain(|_, d| d != digest);
deleted.push(json!({ "imageDigest": digest }));
continue;
}
failures.push(json!({
"imageId": id,
"failureCode": "ImageNotFound",
"failureReason": "Requested image not found",
}));
} else {
failures.push(json!({
"imageId": id,
"failureCode": "InvalidImageTag",
"failureReason": "Either imageDigest or imageTag must be supplied",
}));
}
}
Ok(AwsResponse::ok_json(json!({
"imageIds": deleted,
"failures": failures,
})))
}
fn batch_check_layer_availability(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let digests: Vec<String> = body
.get("layerDigests")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
if digests.is_empty() {
return Err(invalid_parameter(
"At least one layerDigest must be supplied to BatchCheckLayerAvailability",
));
}
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:BatchCheckLayerAvailability",
)?;
let mut layers: Vec<Value> = Vec::new();
let mut failures: Vec<Value> = Vec::new();
for digest in &digests {
match repo.layers.get(digest) {
Some(layer) => layers.push(json!({
"layerDigest": layer.digest,
"layerAvailability": "AVAILABLE",
"layerSize": layer.size,
"mediaType": layer.media_type,
})),
None => failures.push(json!({
"layerDigest": digest,
"failureCode": "MissingLayerDigest",
"failureReason": "Layer not found in repository",
})),
}
}
Ok(AwsResponse::ok_json(json!({
"layers": layers,
"failures": failures,
})))
}
fn describe_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
const DEFAULT_PAGE_SIZE: usize = 100;
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let ids = body
.get("imageIds")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
Some(n) => {
if !(1..=1000).contains(&n) {
return Err(invalid_parameter(format!(
"Value '{n}' at 'maxResults' failed to satisfy constraint: \
Member must have value between 1 and 1000",
)));
}
n as usize
}
None => DEFAULT_PAGE_SIZE,
};
let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
Some(raw) => raw.parse::<usize>().map_err(|_| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
"The specified parameter is invalid: nextToken",
)
})?,
None => 0,
};
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let mut details: Vec<Value> = Vec::new();
let mut next_token: Option<String> = None;
if ids.is_empty() {
let all: Vec<&Image> = repo.images.values().collect();
let start = offset.min(all.len());
let end = (start + max_results).min(all.len());
for img in &all[start..end] {
details.push(image_to_details(repo, img, &repo.registry_id));
}
if end < all.len() {
next_token = Some(end.to_string());
}
} else {
for id in &ids {
let digest =
resolve_image_digest(repo, id).ok_or_else(|| image_not_found(&name, id))?;
let img = repo.images.get(&digest).unwrap();
details.push(image_to_details(repo, img, &repo.registry_id));
}
}
let mut response = json!({ "imageDetails": details });
if let Some(token) = next_token {
response["nextToken"] = json!(token);
}
Ok(AwsResponse::ok_json(response))
}
fn list_images(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
const DEFAULT_PAGE_SIZE: usize = 100;
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let filter_tag_status = body
.get("filter")
.and_then(|v| v.get("tagStatus"))
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let max_results = match body.get("maxResults").and_then(|v| v.as_i64()) {
Some(n) => {
if !(1..=1000).contains(&n) {
return Err(invalid_parameter(format!(
"Value '{n}' at 'maxResults' failed to satisfy constraint: \
Member must have value between 1 and 1000",
)));
}
n as usize
}
None => DEFAULT_PAGE_SIZE,
};
let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
Some(raw) => raw.parse::<usize>().map_err(|_| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
"The specified parameter is invalid: nextToken",
)
})?,
None => 0,
};
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let mut all: Vec<(String, Option<String>)> = Vec::new();
for (tag, digest) in &repo.image_tags {
all.push((digest.clone(), Some(tag.clone())));
}
let tagged_digests: std::collections::HashSet<&String> = repo.image_tags.values().collect();
for digest in repo.images.keys() {
if !tagged_digests.contains(digest) {
all.push((digest.clone(), None));
}
}
all.retain(|(_, tag)| match filter_tag_status.as_deref() {
Some("TAGGED") => tag.is_some(),
Some("UNTAGGED") => tag.is_none(),
_ => true,
});
all.sort();
let start = offset.min(all.len());
let end = (start + max_results).min(all.len());
let ids: Vec<Value> = all[start..end]
.iter()
.map(|(d, t)| {
let mut v = json!({ "imageDigest": d });
if let Some(tag) = t {
v["imageTag"] = json!(tag);
}
v
})
.collect();
let mut response = json!({ "imageIds": ids });
if end < all.len() {
response["nextToken"] = json!(end.to_string());
}
Ok(AwsResponse::ok_json(response))
}
fn get_download_url_for_layer(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let digest = req_str(&body, "layerDigest")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let exclusions = pull_time_exclusion_set(state);
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:GetDownloadUrlForLayer",
)?;
if !repo.layers.contains_key(&digest) {
return Err(layer_not_found(&digest, &name));
}
let mut touched: Vec<String> = Vec::new();
for (img_digest, img) in &repo.images {
let parsed: Value = match serde_json::from_str(&img.image_manifest) {
Ok(v) => v,
Err(_) => continue,
};
let references = parsed
.get("layers")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.any(|l| l.get("digest").and_then(|d| d.as_str()) == Some(digest.as_str()))
})
.unwrap_or(false);
if references {
touched.push(img_digest.clone());
}
}
let caller_arn = request.principal.as_ref().map(|p| p.arn.as_str());
touch_image_pull(repo, &touched, caller_arn, &exclusions);
let endpoint = accounts.endpoint();
let url = format!(
"{}/v2/{}/blobs/{}",
endpoint.trim_end_matches('/'),
name,
digest
);
Ok(AwsResponse::ok_json(json!({
"downloadUrl": url,
"layerDigest": digest,
})))
}
fn initiate_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:InitiateLayerUpload",
)?;
let upload_id = Uuid::new_v4().to_string();
let spool = crate::oci::create_upload_spool(&upload_id).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"InternalError",
format!("failed to create upload spool: {e}"),
)
})?;
state.layer_uploads.insert(
upload_id.clone(),
LayerUpload {
upload_id: upload_id.clone(),
repository_name: name,
created_at: Utc::now(),
spool_path: spool.to_string_lossy().to_string(),
last_byte_received: 0,
},
);
Ok(AwsResponse::ok_json(json!({
"uploadId": upload_id,
"partSize": 10_485_760u64,
})))
}
fn upload_layer_part(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let upload_id = req_str(&body, "uploadId")?.to_string();
let first_byte = body
.get("partFirstByte")
.and_then(|v| v.as_u64())
.ok_or_else(|| invalid_parameter("Missing partFirstByte"))?;
let last_byte = body
.get("partLastByte")
.and_then(|v| v.as_u64())
.ok_or_else(|| invalid_parameter("Missing partLastByte"))?;
let part_blob_b64 = req_str(&body, "layerPartBlob")?.to_string();
let part_bytes = B64
.decode(part_blob_b64.as_bytes())
.map_err(|_| invalid_layer("layerPartBlob is not valid base64"))?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:UploadLayerPart",
)?;
let upload = state
.layer_uploads
.get_mut(&upload_id)
.ok_or_else(|| upload_not_found(&upload_id))?;
if upload.repository_name != name {
return Err(upload_not_found(&upload_id));
}
if first_byte != upload.last_byte_received {
return Err(invalid_layer(format!(
"Layer part upload out of order: expected partFirstByte {} got {}",
upload.last_byte_received, first_byte,
)));
}
let expected_len = last_byte
.checked_sub(first_byte)
.and_then(|d| d.checked_add(1))
.ok_or_else(|| invalid_layer("partLastByte < partFirstByte"))?;
if part_bytes.len() as u64 != expected_len {
return Err(invalid_layer(format!(
"Layer part size mismatch: bytes {} doesn't match range [{first_byte}, {last_byte}]",
part_bytes.len()
)));
}
let spool = std::path::PathBuf::from(&upload.spool_path);
crate::oci::append_bytes_sync(&spool, &part_bytes).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"InternalError",
format!("failed to append upload chunk: {e}"),
)
})?;
upload.last_byte_received = last_byte + 1;
Ok(AwsResponse::ok_json(json!({
"registryId": state.registry_id(),
"repositoryName": name,
"uploadId": upload_id,
"lastByteReceived": last_byte,
})))
}
fn get_authorization_token(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let registry_ids: Vec<String> = body
.get("registryIds")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let accounts = self.state.read();
let default_account = accounts.default_account_id().to_string();
let targets = if registry_ids.is_empty() {
vec![default_account]
} else {
registry_ids
};
let endpoint = accounts.endpoint().to_string();
drop(accounts);
let expires_at = (Utc::now() + chrono::Duration::hours(12)).timestamp();
let authorization_data: Vec<Value> = targets
.into_iter()
.map(|_registry_id| {
let token = B64.encode(format!("AWS:{}", Uuid::new_v4()).as_bytes());
json!({
"authorizationToken": token,
"expiresAt": expires_at,
"proxyEndpoint": endpoint,
})
})
.collect();
Ok(AwsResponse::ok_json(json!({
"authorizationData": authorization_data,
})))
}
fn complete_layer_upload(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let upload_id = req_str(&body, "uploadId")?.to_string();
let digests: Vec<String> = body
.get("layerDigests")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
if digests.is_empty() {
return Err(invalid_parameter(
"At least one layerDigest must be supplied to CompleteLayerUpload",
));
}
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
check_repo_policy(
&account,
&request.account_id,
&repo.repository_arn,
&name,
repo.policy.as_deref(),
"ecr:CompleteLayerUpload",
)?;
let upload = state
.layer_uploads
.get(&upload_id)
.ok_or_else(|| upload_not_found(&upload_id))?;
if upload.repository_name != name {
return Err(upload_not_found(&upload_id));
}
let spool = std::path::PathBuf::from(&upload.spool_path);
let blob_bytes = crate::oci::read_spool(&spool).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::INTERNAL_SERVER_ERROR,
"InternalError",
format!("failed to read upload spool: {e}"),
)
})?;
let computed = sha256_digest(&blob_bytes);
if !digests.iter().any(|d| d == &computed) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"LayerDigestMismatchException",
format!(
"The layer digest from the client ({}) does not match the digest of the received bytes ({computed})",
digests.join(",")
),
));
}
let _upload = state.layer_uploads.remove(&upload_id).unwrap();
crate::oci::unlink_spool(&spool);
let size = blob_bytes.len() as u64;
drop(accounts);
let (stored_bytes, encrypted_with) =
crate::oci::encrypt_layer_bytes(self, &account, &name, &blob_bytes);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.layers.insert(
computed.clone(),
Layer {
digest: computed.clone(),
size,
blob_b64: B64.encode(&stored_bytes),
media_type: "application/vnd.docker.image.rootfs.diff.tar.gzip".to_string(),
encrypted_with_kms_key: encrypted_with,
},
);
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"uploadId": upload_id,
"layerDigest": computed,
})))
}
}
impl EcrService {
fn put_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let policy = req_str(&body, "lifecyclePolicyText")?.to_string();
serde_json::from_str::<Value>(&policy)
.map_err(|_| invalid_parameter("lifecyclePolicyText is not valid JSON"))?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.lifecycle_policy = Some(policy.clone());
let prune = evaluate_lifecycle_policy(repo, &policy);
for digest in &prune {
repo.images.remove(digest);
repo.image_tags.retain(|_, d| d != digest);
}
repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"lifecyclePolicyText": policy,
})))
}
fn get_lifecycle_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let policy = repo
.lifecycle_policy
.clone()
.ok_or_else(|| lifecycle_policy_not_found(&name))?;
let last_eval = repo
.lifecycle_policy_last_evaluated_at
.map(|t| t.timestamp())
.unwrap_or(0);
Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"lifecyclePolicyText": policy,
"lastEvaluatedAt": last_eval,
})))
}
fn delete_lifecycle_policy(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
let policy = repo
.lifecycle_policy
.take()
.ok_or_else(|| lifecycle_policy_not_found(&name))?;
let last_eval = repo
.lifecycle_policy_last_evaluated_at
.take()
.map(|t| t.timestamp())
.unwrap_or(0);
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"lifecyclePolicyText": policy,
"lastEvaluatedAt": last_eval,
})))
}
fn start_lifecycle_policy_preview(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let policy = match opt_str(&body, "lifecyclePolicyText") {
Some(s) => s.to_string(),
None => {
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
repo.lifecycle_policy
.clone()
.ok_or_else(|| lifecycle_policy_not_found(&name))?
}
};
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
let _prune = evaluate_lifecycle_policy(repo, &policy);
repo.lifecycle_policy_last_evaluated_at = Some(Utc::now());
let registry_id = repo.registry_id.clone();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"lifecyclePolicyText": policy,
"status": "COMPLETE",
})))
}
fn get_lifecycle_policy_preview(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let policy = repo
.lifecycle_policy
.clone()
.ok_or_else(|| lifecycle_policy_not_found(&name))?;
let prune = evaluate_lifecycle_policy(repo, &policy);
let results: Vec<Value> = prune
.iter()
.map(|digest| {
json!({
"imageDigest": digest,
"imagePushedAt": repo.images.get(digest).map(|i| i.image_pushed_at.timestamp()).unwrap_or(0),
"action": {"type": "EXPIRE"},
})
})
.collect();
Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"lifecyclePolicyText": policy,
"status": "COMPLETE",
"previewResults": results,
"summary": {"expiringImageTotalCount": prune.len()},
})))
}
fn start_image_scan(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
use crate::state::ImageScanFindings;
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let image_id = body
.get("imageId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing imageId"))?;
let account = target_account_id(request, &body);
let (digest, layers, registry_id) = {
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
let digest = resolve_image_digest(repo, &image_id)
.ok_or_else(|| image_not_found(&name, &image_id))?;
repo.scan_findings.insert(
digest.clone(),
ImageScanFindings {
image_digest: digest.clone(),
scan_status: "IN_PROGRESS".to_string(),
scan_completed_at: None,
vulnerability_source_updated_at: None,
finding_severity_counts: BTreeMap::new(),
findings: Vec::new(),
},
);
let layers = layers_for_image(repo, &digest);
(digest, layers, repo.registry_id.clone())
};
let shared = self.state.clone();
let store = self.snapshot_store.clone();
let snap_lock = self.snapshot_lock.clone();
let account_for_task = account.clone();
let name_for_task = name.clone();
let digest_for_task = digest.clone();
tokio::spawn(async move {
let result = crate::scanner::scan_layers(&digest_for_task, &layers).await;
{
let mut accounts = shared.write();
let Some(state) = accounts.get_mut(&account_for_task) else {
return;
};
let Some(repo) = state.repositories.get_mut(&name_for_task) else {
return;
};
let findings = result.unwrap_or_else(|| ImageScanFindings {
image_digest: digest_for_task.clone(),
scan_status: "COMPLETE".to_string(),
scan_completed_at: Some(Utc::now()),
vulnerability_source_updated_at: Some(Utc::now()),
finding_severity_counts: BTreeMap::new(),
findings: Vec::new(),
});
repo.scan_findings.insert(digest_for_task.clone(), findings);
}
EcrService::save_snapshot_with(shared, store, snap_lock).await;
});
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"imageId": image_id,
"imageScanStatus": {"status": "IN_PROGRESS"},
})))
}
fn describe_image_scan_findings(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let image_id = body
.get("imageId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing imageId"))?;
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let digest = resolve_image_digest(repo, &image_id)
.ok_or_else(|| image_not_found(&name, &image_id))?;
let findings = repo.scan_findings.get(&digest).cloned().unwrap_or_else(|| {
crate::state::ImageScanFindings {
image_digest: digest.clone(),
scan_status: "COMPLETE".to_string(),
scan_completed_at: Some(Utc::now()),
vulnerability_source_updated_at: Some(Utc::now()),
finding_severity_counts: BTreeMap::new(),
findings: Vec::new(),
}
});
Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageScanStatus": {"status": findings.scan_status},
"imageScanFindings": {
"imageScanCompletedAt": findings.scan_completed_at.map(|t| t.timestamp()),
"vulnerabilitySourceUpdatedAt": findings.vulnerability_source_updated_at.map(|t| t.timestamp()),
"findings": findings.findings,
"findingSeverityCounts": findings.finding_severity_counts,
},
})))
}
fn describe_registry(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let registry_id = state
.map(|s| s.account_id.clone())
.unwrap_or_else(|| account.clone());
let rules = state
.and_then(|s| s.replication_configuration.as_ref())
.map(|cfg| {
cfg.rules
.iter()
.map(|r| {
json!({
"destinations": r.destinations.iter().map(|d| json!({
"region": d.region,
"registryId": d.registry_id,
})).collect::<Vec<_>>(),
"repositoryFilters": r.repository_filters.iter().map(|f| json!({
"filter": f.filter,
"filterType": f.filter_type,
})).collect::<Vec<_>>(),
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"replicationConfiguration": {"rules": rules},
})))
}
fn get_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(registry_policy_not_found)?;
let policy = state
.registry_policy
.clone()
.ok_or_else(registry_policy_not_found)?;
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"policyText": policy,
})))
}
fn put_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let policy = req_str(&body, "policyText")?.to_string();
if policy.len() > 10_240 {
return Err(invalid_parameter(format!(
"Value at 'policyText' failed to satisfy constraint: \
Member must have length less than or equal to 10240 (got {})",
policy.len()
)));
}
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.registry_policy = Some(policy.clone());
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"policyText": policy,
})))
}
fn delete_registry_policy(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(registry_policy_not_found)?;
let policy = state
.registry_policy
.take()
.ok_or_else(registry_policy_not_found)?;
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"policyText": policy,
})))
}
fn get_registry_scanning_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let cfg = state
.map(|s| s.registry_scanning_configuration.clone())
.unwrap_or_default();
let rules: Vec<Value> = cfg
.rules
.iter()
.map(|r| {
json!({
"scanFrequency": r.scan_frequency,
"repositoryFilters": r.repository_filters.iter().map(|f| json!({
"filter": f.filter,
"filterType": f.filter_type,
})).collect::<Vec<_>>(),
})
})
.collect();
Ok(AwsResponse::ok_json(json!({
"registryId": state.map(|s| s.account_id.clone()).unwrap_or(account),
"scanningConfiguration": {
"scanType": cfg.scan_type,
"rules": rules,
},
})))
}
fn put_registry_scanning_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::state::{RegistryScanningConfiguration, RegistryScanningRule, RepositoryFilter};
let body = request.json_body();
let scan_type = opt_str(&body, "scanType").unwrap_or("BASIC").to_string();
if scan_type != "BASIC" && scan_type != "ENHANCED" {
return Err(invalid_parameter(format!(
"Invalid scanType '{scan_type}'. Must be BASIC or ENHANCED."
)));
}
let rules = body
.get("rules")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let parsed_rules: Vec<RegistryScanningRule> = rules
.iter()
.map(|r| RegistryScanningRule {
scan_frequency: r
.get("scanFrequency")
.and_then(|v| v.as_str())
.unwrap_or("SCAN_ON_PUSH")
.to_string(),
repository_filters: r
.get("repositoryFilters")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.map(|f| RepositoryFilter {
filter: f
.get("filter")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
filter_type: f
.get("filterType")
.and_then(|v| v.as_str())
.unwrap_or("WILDCARD")
.to_string(),
})
.collect()
})
.unwrap_or_default(),
})
.collect();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.registry_scanning_configuration = RegistryScanningConfiguration {
scan_type: scan_type.clone(),
rules: parsed_rules,
};
let cfg = state.registry_scanning_configuration.clone();
Ok(AwsResponse::ok_json(json!({
"registryScanningConfiguration": {
"scanType": cfg.scan_type,
"rules": cfg.rules.iter().map(|r| json!({
"scanFrequency": r.scan_frequency,
"repositoryFilters": r.repository_filters.iter().map(|f| json!({
"filter": f.filter,
"filterType": f.filter_type,
})).collect::<Vec<_>>(),
})).collect::<Vec<_>>(),
},
})))
}
fn batch_get_repository_scanning_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let names: Vec<String> = body
.get("repositoryNames")
.and_then(|v| v.as_array())
.ok_or_else(|| invalid_parameter("Missing required field: repositoryNames"))?
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&account))?;
let mut scanning: Vec<Value> = Vec::new();
let mut failures: Vec<Value> = Vec::new();
for n in &names {
match state.repositories.get(n) {
Some(repo) => scanning.push(json!({
"repositoryArn": repo.repository_arn,
"repositoryName": n,
"scanOnPush": repo.image_scanning_configuration.scan_on_push,
"scanFrequency": "SCAN_ON_PUSH",
"appliedScanFilters": [],
})),
None => failures.push(json!({
"repositoryName": n,
"failureCode": "REPOSITORY_NOT_FOUND",
"failureReason": format!("Repository '{n}' not found"),
})),
}
}
Ok(AwsResponse::ok_json(json!({
"scanningConfigurations": scanning,
"failures": failures,
})))
}
fn put_replication_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::state::{
ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
};
let body = request.json_body();
let cfg_value = body
.get("replicationConfiguration")
.cloned()
.ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
let rules_value = cfg_value
.get("rules")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let rules: Vec<ReplicationRule> = rules_value
.iter()
.map(|r| ReplicationRule {
destinations: r
.get("destinations")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.map(|d| ReplicationDestination {
region: d
.get("region")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
registry_id: d
.get("registryId")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
})
.collect()
})
.unwrap_or_default(),
repository_filters: r
.get("repositoryFilters")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.map(|f| RepositoryFilter {
filter: f
.get("filter")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
filter_type: f
.get("filterType")
.and_then(|v| v.as_str())
.unwrap_or("PREFIX_MATCH")
.to_string(),
})
.collect()
})
.unwrap_or_default(),
})
.collect();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.replication_configuration = Some(ReplicationConfiguration { rules });
let cfg = state.replication_configuration.clone().unwrap();
Ok(AwsResponse::ok_json(json!({
"replicationConfiguration": {
"rules": cfg.rules.iter().map(|r| json!({
"destinations": r.destinations.iter().map(|d| json!({
"region": d.region,
"registryId": d.registry_id,
})).collect::<Vec<_>>(),
"repositoryFilters": r.repository_filters.iter().map(|f| json!({
"filter": f.filter,
"filterType": f.filter_type,
})).collect::<Vec<_>>(),
})).collect::<Vec<_>>(),
},
})))
}
fn describe_image_replication_status(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let image_id = body
.get("imageId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing imageId"))?;
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let Some(digest) = resolve_image_digest(repo, &image_id) else {
return Err(image_not_found(&name, &image_id));
};
let statuses: Vec<Value> = repo
.replication_statuses
.get(&digest)
.map(|entries| {
entries
.iter()
.map(|s| {
let mut obj = serde_json::Map::new();
obj.insert("region".into(), Value::String(s.region.clone()));
obj.insert("registryId".into(), Value::String(s.registry_id.clone()));
obj.insert("status".into(), Value::String(s.status.clone()));
if let Some(ref code) = s.failure_code {
obj.insert("failureCode".into(), Value::String(code.clone()));
}
if let Some(ref reason) = s.failure_reason {
obj.insert("failureReason".into(), Value::String(reason.clone()));
}
Value::Object(obj)
})
.collect()
})
.unwrap_or_default();
Ok(AwsResponse::ok_json(json!({
"repositoryName": name,
"imageId": image_id,
"replicationStatuses": statuses,
})))
}
fn create_pull_through_cache_rule(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::state::PullThroughCacheRule;
let body = request.json_body();
let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
validate_pullthrough_prefix(&prefix)?;
let upstream_url = req_str(&body, "upstreamRegistryUrl")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
if state.pull_through_cache_rules.contains_key(&prefix) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"PullThroughCacheRuleAlreadyExistsException",
format!("A pull through cache rule with the prefix '{prefix}' already exists."),
));
}
let now = Utc::now();
let rule = PullThroughCacheRule {
ecr_repository_prefix: prefix.clone(),
upstream_registry_url: upstream_url.clone(),
upstream_registry: opt_str(&body, "upstreamRegistry").map(|s| s.to_string()),
credential_arn: opt_str(&body, "credentialArn").map(|s| s.to_string()),
created_at: now,
updated_at: now,
custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
};
state
.pull_through_cache_rules
.insert(prefix.clone(), rule.clone());
Ok(AwsResponse::ok_json(pull_through_rule_json(
state.account_id.as_str(),
&rule,
)))
}
fn delete_pull_through_cache_rule(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
validate_pullthrough_prefix(&prefix)?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let removed = state
.pull_through_cache_rules
.remove(&prefix)
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"PullThroughCacheRuleNotFoundException",
format!("No pull through cache rule with prefix '{prefix}' exists."),
)
})?;
let mut response = pull_through_rule_json(state.account_id.as_str(), &removed);
if let Value::Object(ref mut map) = response {
map.remove("upstreamRegistry");
}
Ok(AwsResponse::ok_json(response))
}
fn describe_pull_through_cache_rules(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
validate_max_results(&body)?;
let prefixes: Vec<String> = body
.get("ecrRepositoryPrefixes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let rules: Vec<&crate::state::PullThroughCacheRule> = state
.map(|s| s.pull_through_cache_rules.values().collect())
.unwrap_or_default();
let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
let filtered: Vec<Value> = rules
.iter()
.filter(|r| prefixes.is_empty() || prefixes.contains(&r.ecr_repository_prefix))
.map(|r| pull_through_rule_json_with_updated(®istry_id, r))
.collect();
Ok(AwsResponse::ok_json(json!({
"pullThroughCacheRules": filtered,
})))
}
fn update_pull_through_cache_rule(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let rule = state
.pull_through_cache_rules
.get_mut(&prefix)
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"PullThroughCacheRuleNotFoundException",
format!("No pull through cache rule with prefix '{prefix}' exists."),
)
})?;
if let Some(cred) = opt_str(&body, "credentialArn") {
rule.credential_arn = Some(cred.to_string());
}
if let Some(role) = opt_str(&body, "customRoleArn") {
rule.custom_role_arn = Some(role.to_string());
}
rule.updated_at = Utc::now();
let response = pull_through_rule_json_with_updated(state.account_id.as_str(), rule);
Ok(AwsResponse::ok_json(response))
}
fn validate_pull_through_cache_rule(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let prefix = req_str(&body, "ecrRepositoryPrefix")?.to_string();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let rule = state
.and_then(|s| s.pull_through_cache_rules.get(&prefix))
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"PullThroughCacheRuleNotFoundException",
format!("No pull through cache rule with prefix '{prefix}' exists."),
)
})?;
let registry_id = state.map(|s| s.account_id.clone()).unwrap_or_default();
let mut base = pull_through_rule_json(®istry_id, rule);
base["isValid"] = json!(true);
Ok(AwsResponse::ok_json(base))
}
fn get_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
validate_account_setting_name(&name)?;
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let value = state
.and_then(|s| s.account_settings.get(&name).cloned())
.unwrap_or_else(|| "DISABLED".to_string());
Ok(AwsResponse::ok_json(json!({
"name": name,
"value": value,
})))
}
fn put_account_setting(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "name")?.to_string();
validate_account_setting_name(&name)?;
let value = req_str(&body, "value")?.to_string();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.account_settings.insert(name.clone(), value.clone());
Ok(AwsResponse::ok_json(json!({
"name": name,
"value": value,
})))
}
fn create_repository_creation_template(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::state::{EncryptionConfiguration as Enc, RepositoryCreationTemplate};
let body = request.json_body();
let prefix = req_str(&body, "prefix")?.to_string();
validate_template_prefix(&prefix)?;
let applied_for: Vec<String> = body
.get("appliedFor")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let image_tag_mutability = opt_str(&body, "imageTagMutability")
.unwrap_or("MUTABLE")
.to_string();
let resource_tags = body
.get("resourceTags")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let encryption = body.get("encryptionConfiguration").map(|v| Enc {
encryption_type: v
.get("encryptionType")
.and_then(|x| x.as_str())
.unwrap_or("AES256")
.to_string(),
kms_key: v
.get("kmsKey")
.and_then(|x| x.as_str())
.map(|s| s.to_string()),
});
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
if state.repository_creation_templates.contains_key(&prefix) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"TemplateAlreadyExistsException",
format!(
"A repository creation template with the prefix '{prefix}' already exists."
),
));
}
let now = Utc::now();
let tpl = RepositoryCreationTemplate {
prefix: prefix.clone(),
description: opt_str(&body, "description").map(|s| s.to_string()),
image_tag_mutability,
applied_for,
resource_tags,
created_at: now,
updated_at: now,
custom_role_arn: opt_str(&body, "customRoleArn").map(|s| s.to_string()),
repository_policy: opt_str(&body, "repositoryPolicy").map(|s| s.to_string()),
lifecycle_policy: opt_str(&body, "lifecyclePolicy").map(|s| s.to_string()),
encryption_configuration: encryption,
};
state
.repository_creation_templates
.insert(prefix, tpl.clone());
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"repositoryCreationTemplate": template_to_json(&tpl),
})))
}
fn delete_repository_creation_template(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let prefix = req_str(&body, "prefix")?.to_string();
validate_template_prefix(&prefix)?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let removed = state
.repository_creation_templates
.remove(&prefix)
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"TemplateNotFoundException",
format!("No repository creation template with prefix '{prefix}' exists."),
)
})?;
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"repositoryCreationTemplate": template_to_json(&removed),
})))
}
fn describe_repository_creation_templates(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
validate_max_results(&body)?;
let prefixes: Vec<String> = body
.get("prefixes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let tpls: Vec<Value> = state
.map(|s| {
s.repository_creation_templates
.values()
.filter(|t| prefixes.is_empty() || prefixes.contains(&t.prefix))
.map(template_to_json)
.collect()
})
.unwrap_or_default();
Ok(AwsResponse::ok_json(json!({
"registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
"repositoryCreationTemplates": tpls,
})))
}
fn update_repository_creation_template(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let prefix = req_str(&body, "prefix")?.to_string();
validate_template_prefix(&prefix)?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
let tpl = state
.repository_creation_templates
.get_mut(&prefix)
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"TemplateNotFoundException",
format!("No repository creation template with prefix '{prefix}' exists."),
)
})?;
if let Some(desc) = opt_str(&body, "description") {
tpl.description = Some(desc.to_string());
}
if let Some(mutability) = opt_str(&body, "imageTagMutability") {
tpl.image_tag_mutability = mutability.to_string();
}
if let Some(arr) = body.get("appliedFor").and_then(|v| v.as_array()) {
tpl.applied_for = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
}
if let Some(arr) = body.get("resourceTags").and_then(|v| v.as_array()) {
tpl.resource_tags = arr.clone();
}
tpl.updated_at = Utc::now();
Ok(AwsResponse::ok_json(json!({
"registryId": state.account_id,
"repositoryCreationTemplate": template_to_json(tpl),
})))
}
fn get_signing_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let rules: Vec<Value> = state
.and_then(|s| s.signing_configuration.as_ref())
.map(|c| c.rules.clone())
.unwrap_or_default();
Ok(AwsResponse::ok_json(json!({
"registryId": state.map(|s| s.account_id.clone()).unwrap_or_default(),
"signingConfiguration": {"rules": rules},
})))
}
fn put_signing_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::signing::TrustedKey;
use crate::state::SigningConfiguration;
let body = request.json_body();
let cfg = body
.get("signingConfiguration")
.ok_or_else(|| invalid_parameter("Missing required field: signingConfiguration"))?;
let rules: Vec<Value> = cfg
.get("rules")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut trusted_keys: Vec<TrustedKey> = Vec::new();
for rule in &rules {
let keys = match rule.get("trustedKeys").and_then(|v| v.as_array()) {
Some(k) => k,
None => continue,
};
for k in keys {
let key_id = k
.get("keyId")
.and_then(|v| v.as_str())
.unwrap_or_default()
.to_string();
let pem = match k.get("pem").and_then(|v| v.as_str()) {
Some(p) => p.to_string(),
None => continue,
};
let algorithm = k
.get("algorithm")
.and_then(|v| v.as_str())
.unwrap_or("ECDSA-P256")
.to_string();
if <p256::ecdsa::VerifyingKey as p256::pkcs8::DecodePublicKey>::from_public_key_pem(
&pem,
)
.is_err()
{
return Err(invalid_parameter(format!(
"trusted key {key_id} is not a valid ECDSA-P256 PEM-encoded public key"
)));
}
trusted_keys.push(TrustedKey {
key_id,
pem,
algorithm,
});
}
}
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.signing_configuration = Some(SigningConfiguration {
rules: rules.clone(),
trusted_keys,
});
Ok(AwsResponse::ok_json(json!({
"signingConfiguration": {"rules": rules},
})))
}
fn delete_signing_configuration(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.signing_configuration = None;
Ok(AwsResponse::ok_json(json!({})))
}
fn describe_image_signing_status(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let image_id = body
.get("imageId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing imageId"))?;
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
let image_digest = resolve_image_digest(repo, &image_id)
.ok_or_else(|| image_not_found(&name, &image_id))?;
let trusted_keys: &[crate::signing::TrustedKey] = state
.signing_configuration
.as_ref()
.map(|c| c.trusted_keys.as_slice())
.unwrap_or(&[]);
let sig_tag = match crate::signing::companion_sig_tag(&image_digest) {
Some(t) => t,
None => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "UNSIGNED",
})));
}
};
let sig_manifest_digest = match repo.image_tags.get(&sig_tag) {
Some(d) => d,
None => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "UNSIGNED",
})));
}
};
let sig_image = match repo.images.get(sig_manifest_digest) {
Some(i) => i,
None => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "UNSIGNED",
})));
}
};
let manifest_json: Value = match serde_json::from_str(&sig_image.image_manifest) {
Ok(v) => v,
Err(_) => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "INVALID_SIGNATURE",
})));
}
};
let (layer_digest, signature_b64) =
match crate::signing::extract_signature_annotation(&manifest_json) {
Some(x) => x,
None => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "UNSIGNED",
})));
}
};
let payload_bytes: Vec<u8> = match repo.layers.get(&layer_digest) {
Some(layer) => base64::Engine::decode(
&base64::engine::general_purpose::STANDARD,
layer.blob_b64.as_bytes(),
)
.unwrap_or_default(),
None => {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "UNSIGNED",
})));
}
};
if let Some(named) = crate::signing::referenced_image_digest(&payload_bytes) {
if named != image_digest {
return Ok(AwsResponse::ok_json(json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
"imageSignatures": [],
"signingStatus": "INVALID_SIGNATURE",
"statusReason": "signature payload references a different image digest",
})));
}
}
let mut matched: Option<&crate::signing::TrustedKey> = None;
for key in trusted_keys {
if crate::signing::verify_cosign_signature(&key.pem, &payload_bytes, &signature_b64)
.is_ok()
{
matched = Some(key);
break;
}
}
let mut response = json!({
"registryId": repo.registry_id,
"repositoryName": name,
"imageId": image_id,
});
if let Some(key) = matched {
response["imageSignatures"] = json!([{
"signatureFormat": "COSIGN",
"keyId": key.key_id,
"algorithm": key.algorithm,
"valid": true,
}]);
response["signingStatus"] = json!("SIGNED");
} else if trusted_keys.is_empty() {
response["imageSignatures"] = json!([{
"signatureFormat": "COSIGN",
"valid": false,
"statusReason": "no trusted keys configured"
}]);
response["signingStatus"] = json!("UNVERIFIED");
} else {
response["imageSignatures"] = json!([{
"signatureFormat": "COSIGN",
"valid": false,
"statusReason": "signature did not match any trusted key"
}]);
response["signingStatus"] = json!("INVALID_SIGNATURE");
}
Ok(AwsResponse::ok_json(response))
}
fn register_pull_time_update_exclusion(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
use crate::state::PullTimeExclusion;
let body = request.json_body();
let principal_arn = req_str(&body, "principalArn")?.to_string();
validate_string_length("principalArn", &principal_arn, 0, 200)?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state
.pull_time_exclusions
.entry(principal_arn.clone())
.or_insert_with(|| PullTimeExclusion {
principal_arn: principal_arn.clone(),
registered_at: Utc::now(),
});
Ok(AwsResponse::ok_json(json!({
"principalArn": principal_arn,
})))
}
fn deregister_pull_time_update_exclusion(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let principal_arn = req_str(&body, "principalArn")?.to_string();
validate_string_length("principalArn", &principal_arn, 0, 200)?;
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts.get_or_create(&account);
state.pull_time_exclusions.remove(&principal_arn);
Ok(AwsResponse::ok_json(json!({
"principalArn": principal_arn,
})))
}
fn list_pull_time_update_exclusions(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
validate_max_results(&body)?;
let max_results = body
.get("maxResults")
.and_then(|v| v.as_u64())
.map(|n| n as usize);
let offset = match body.get("nextToken").and_then(|v| v.as_str()) {
Some(raw) => raw.parse::<usize>().map_err(|_| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"InvalidParameterException",
"The specified parameter is invalid: nextToken",
)
})?,
None => 0,
};
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts.get(&account);
let mut all: Vec<Value> = state
.map(|s| {
s.pull_time_exclusions
.values()
.map(|e| {
json!({
"principalArn": e.principal_arn,
"registeredAt": e.registered_at.timestamp(),
})
})
.collect()
})
.unwrap_or_default();
let total = all.len();
let start = offset.min(total);
all.drain(..start);
let next_token = match max_results {
Some(n) if all.len() > n => {
all.truncate(n);
Some((start + n).to_string())
}
_ => None,
};
let mut out = json!({ "pullTimeUpdateExclusions": all });
if let Some(tok) = next_token {
out["nextToken"] = json!(tok);
}
Ok(AwsResponse::ok_json(out))
}
fn list_image_referrers(&self, request: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let subject = body
.get("subjectId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing subjectId"))?;
let digest = subject
.get("imageDigest")
.and_then(|v| v.as_str())
.ok_or_else(|| invalid_parameter("subjectId.imageDigest is required"))?
.to_string();
let filter = body.get("filter").cloned().unwrap_or(Value::Null);
let artifact_type_filter: Option<Vec<String>> = filter
.get("artifactTypes")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
});
let artifact_status_filter: String = filter
.get("artifactStatus")
.and_then(|v| v.as_str())
.unwrap_or("ACTIVE")
.to_string();
let account = target_account_id(request, &body);
let accounts = self.state.read();
let state = accounts
.get(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get(&name)
.ok_or_else(|| repository_not_found(&name))?;
if !repo.images.contains_key(&digest) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ImageNotFoundException",
format!("Subject image {digest} not found in repository '{name}'"),
));
}
let mut referrers: Vec<Value> = Vec::new();
for image in repo.images.values() {
let parsed: Value = match serde_json::from_str(&image.image_manifest) {
Ok(v) => v,
Err(_) => continue,
};
let subject_digest = parsed
.get("subject")
.and_then(|s| s.get("digest"))
.and_then(|d| d.as_str());
if subject_digest != Some(digest.as_str()) {
continue;
}
let artifact_type = parsed
.get("artifactType")
.and_then(|v| v.as_str())
.or_else(|| {
parsed
.get("config")
.and_then(|c| c.get("mediaType"))
.and_then(|v| v.as_str())
})
.map(str::to_string);
if let Some(ref allowed) = artifact_type_filter {
if !allowed.iter().any(|t| Some(t) == artifact_type.as_ref()) {
continue;
}
}
if artifact_status_filter != "ANY" && image.image_status != artifact_status_filter {
continue;
}
let annotations = parsed
.get("annotations")
.cloned()
.unwrap_or_else(|| json!({}));
let mut referrer = json!({
"digest": image.image_digest,
"mediaType": image.image_manifest_media_type,
"size": image.image_size_in_bytes,
"annotations": annotations,
"artifactStatus": image.image_status,
});
if let Some(t) = artifact_type {
referrer["artifactType"] = json!(t);
}
referrers.push(referrer);
}
Ok(AwsResponse::ok_json(json!({
"referrers": referrers,
})))
}
fn update_image_storage_class(
&self,
request: &AwsRequest,
) -> Result<AwsResponse, AwsServiceError> {
let body = request.json_body();
let name = req_str(&body, "repositoryName")?.to_string();
let image_id = body
.get("imageId")
.cloned()
.ok_or_else(|| invalid_parameter("Missing imageId"))?;
let target_class = req_str(&body, "targetStorageClass")?.to_string();
if target_class != "STANDARD" && target_class != "ARCHIVE" {
return Err(invalid_parameter(format!(
"Invalid targetStorageClass '{target_class}'. Must be STANDARD or ARCHIVE."
)));
}
let account = target_account_id(request, &body);
let mut accounts = self.state.write();
let state = accounts
.get_mut(&account)
.ok_or_else(|| repository_not_found(&name))?;
let repo = state
.repositories
.get_mut(&name)
.ok_or_else(|| repository_not_found(&name))?;
let digest = resolve_image_digest(repo, &image_id)
.ok_or_else(|| image_not_found(&name, &image_id))?;
let registry_id = repo.registry_id.clone();
let now = Utc::now();
let image = repo.images.get_mut(&digest).expect("digest resolves");
let new_status = match target_class.as_str() {
"ARCHIVE" => {
image.last_archived_at = Some(now);
"ARCHIVED"
}
_ => {
image.last_activated_at = Some(now);
"ACTIVE"
}
};
image.image_status = new_status.to_string();
Ok(AwsResponse::ok_json(json!({
"registryId": registry_id,
"repositoryName": name,
"imageId": image_id,
"imageStatus": new_status,
})))
}
}
#[path = "service_helpers.rs"]
mod service_helpers;
pub use service_helpers::evaluate_lifecycle_policy;
pub(crate) use service_helpers::*;
#[cfg(test)]
#[path = "service_tests.rs"]
mod tests;