1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19 EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20 Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24 "CreateRepository",
25 "DeleteRepository",
26 "DescribeRepositories",
27 "PutImageTagMutability",
28 "PutImageScanningConfiguration",
29 "SetRepositoryPolicy",
30 "GetRepositoryPolicy",
31 "DeleteRepositoryPolicy",
32 "TagResource",
33 "UntagResource",
34 "ListTagsForResource",
35 "PutImage",
36 "BatchGetImage",
37 "BatchDeleteImage",
38 "BatchCheckLayerAvailability",
39 "DescribeImages",
40 "ListImages",
41 "GetDownloadUrlForLayer",
42 "InitiateLayerUpload",
43 "UploadLayerPart",
44 "CompleteLayerUpload",
45 "GetAuthorizationToken",
46 "PutLifecyclePolicy",
47 "GetLifecyclePolicy",
48 "DeleteLifecyclePolicy",
49 "StartLifecyclePolicyPreview",
50 "GetLifecyclePolicyPreview",
51 "StartImageScan",
52 "DescribeImageScanFindings",
53 "DescribeRegistry",
54 "GetRegistryPolicy",
55 "PutRegistryPolicy",
56 "DeleteRegistryPolicy",
57 "GetRegistryScanningConfiguration",
58 "PutRegistryScanningConfiguration",
59 "BatchGetRepositoryScanningConfiguration",
60 "PutReplicationConfiguration",
61 "DescribeImageReplicationStatus",
62 "CreatePullThroughCacheRule",
63 "DeletePullThroughCacheRule",
64 "DescribePullThroughCacheRules",
65 "UpdatePullThroughCacheRule",
66 "ValidatePullThroughCacheRule",
67 "GetAccountSetting",
68 "PutAccountSetting",
69 "CreateRepositoryCreationTemplate",
70 "DeleteRepositoryCreationTemplate",
71 "DescribeRepositoryCreationTemplates",
72 "UpdateRepositoryCreationTemplate",
73 "GetSigningConfiguration",
74 "PutSigningConfiguration",
75 "DeleteSigningConfiguration",
76 "DescribeImageSigningStatus",
77 "RegisterPullTimeUpdateExclusion",
78 "DeregisterPullTimeUpdateExclusion",
79 "ListPullTimeUpdateExclusions",
80 "ListImageReferrers",
81 "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85 state: SharedEcrState,
86 snapshot_store: Option<Arc<dyn SnapshotStore>>,
87 snapshot_lock: Arc<AsyncMutex<()>>,
88 kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95mod images;
96mod layers;
97mod lifecycle;
98mod policies;
99mod pull_through;
100mod registry;
101mod repositories;
102mod scanning;
103mod settings;
104mod signing;
105mod tags;
106mod templates;
107
108impl EcrService {
109 pub fn new(state: SharedEcrState) -> Self {
110 Self {
111 state,
112 snapshot_store: None,
113 snapshot_lock: Arc::new(AsyncMutex::new(())),
114 kms_state: None,
115 }
116 }
117
118 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119 self.snapshot_store = Some(store);
120 self
121 }
122
123 pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
124 self.kms_state = Some(kms);
125 self
126 }
127
128 pub fn state_handle(&self) -> &SharedEcrState {
133 &self.state
134 }
135
136 pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
139 self.kms_state.as_ref()
140 }
141
142 async fn save_snapshot(&self) {
143 Self::save_snapshot_with(
144 self.state.clone(),
145 self.snapshot_store.clone(),
146 self.snapshot_lock.clone(),
147 )
148 .await
149 }
150
151 pub(crate) async fn save_snapshot_with(
155 state: SharedEcrState,
156 store: Option<Arc<dyn SnapshotStore>>,
157 lock: Arc<AsyncMutex<()>>,
158 ) {
159 let Some(store) = store else {
160 return;
161 };
162 let _guard = lock.lock().await;
163 let snapshot = EcrSnapshot {
164 schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
165 accounts: Some(state.read().clone()),
166 };
167 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
168 let bytes = serde_json::to_vec(&snapshot)
169 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
170 store.save(&bytes)
171 })
172 .await;
173 match join {
174 Ok(Ok(())) => {}
175 Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
176 Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
177 }
178 }
179}
180
181#[async_trait]
182impl AwsService for EcrService {
183 fn service_name(&self) -> &str {
184 "ecr"
185 }
186
187 async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
188 if request
197 .path_segments
198 .first()
199 .map(|s| s == "v2")
200 .unwrap_or(false)
201 {
202 let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
205 && request.path_segments.len() >= 5
206 && request.path_segments[request.path_segments.len() - 2] == "uploads";
207 if !is_blob_upload {
208 if let Some(stream) = request.take_body_stream() {
209 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
210 }
211 }
212 let result = crate::oci::dispatch(self, &request).await;
213 let is_pull_get = request.method == http::Method::GET
219 && request.path_segments.len() >= 3
220 && matches!(
221 request.path_segments[request.path_segments.len() - 2].as_str(),
222 "blobs" | "manifests"
223 );
224 let mutates_oci = is_pull_get
225 || matches!(
226 request.method,
227 http::Method::POST
228 | http::Method::PUT
229 | http::Method::PATCH
230 | http::Method::DELETE
231 );
232 if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
233 self.save_snapshot().await;
234 }
235 return result;
236 }
237
238 if let Some(stream) = request.take_body_stream() {
240 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
241 }
242
243 let mutates = is_mutating(request.action.as_str());
244 let result = match request.action.as_str() {
245 "CreateRepository" => self.create_repository(&request),
246 "DeleteRepository" => self.delete_repository(&request),
247 "DescribeRepositories" => self.describe_repositories(&request),
248 "PutImageTagMutability" => self.put_image_tag_mutability(&request),
249 "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
250 "SetRepositoryPolicy" => self.set_repository_policy(&request),
251 "GetRepositoryPolicy" => self.get_repository_policy(&request),
252 "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
253 "TagResource" => self.tag_resource(&request),
254 "UntagResource" => self.untag_resource(&request),
255 "ListTagsForResource" => self.list_tags_for_resource(&request),
256 "PutImage" => self.put_image(&request),
257 "BatchGetImage" => self.batch_get_image(&request),
258 "BatchDeleteImage" => self.batch_delete_image(&request),
259 "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
260 "DescribeImages" => self.describe_images(&request),
261 "ListImages" => self.list_images(&request),
262 "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
263 "InitiateLayerUpload" => self.initiate_layer_upload(&request),
264 "UploadLayerPart" => self.upload_layer_part(&request).await,
265 "CompleteLayerUpload" => self.complete_layer_upload(&request).await,
266 "GetAuthorizationToken" => self.get_authorization_token(&request),
267 "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
268 "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
269 "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
270 "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
271 "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
272 "StartImageScan" => self.start_image_scan(&request),
273 "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
274 "DescribeRegistry" => self.describe_registry(&request),
275 "GetRegistryPolicy" => self.get_registry_policy(&request),
276 "PutRegistryPolicy" => self.put_registry_policy(&request),
277 "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
278 "GetRegistryScanningConfiguration" => {
279 self.get_registry_scanning_configuration(&request)
280 }
281 "PutRegistryScanningConfiguration" => {
282 self.put_registry_scanning_configuration(&request)
283 }
284 "BatchGetRepositoryScanningConfiguration" => {
285 self.batch_get_repository_scanning_configuration(&request)
286 }
287 "PutReplicationConfiguration" => self.put_replication_configuration(&request),
288 "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
289 "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
290 "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
291 "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
292 "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
293 "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
294 "GetAccountSetting" => self.get_account_setting(&request),
295 "PutAccountSetting" => self.put_account_setting(&request),
296 "CreateRepositoryCreationTemplate" => {
297 self.create_repository_creation_template(&request)
298 }
299 "DeleteRepositoryCreationTemplate" => {
300 self.delete_repository_creation_template(&request)
301 }
302 "DescribeRepositoryCreationTemplates" => {
303 self.describe_repository_creation_templates(&request)
304 }
305 "UpdateRepositoryCreationTemplate" => {
306 self.update_repository_creation_template(&request)
307 }
308 "GetSigningConfiguration" => self.get_signing_configuration(&request),
309 "PutSigningConfiguration" => self.put_signing_configuration(&request),
310 "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
311 "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
312 "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
313 "DeregisterPullTimeUpdateExclusion" => {
314 self.deregister_pull_time_update_exclusion(&request)
315 }
316 "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
317 "ListImageReferrers" => self.list_image_referrers(&request),
318 "UpdateImageStorageClass" => self.update_image_storage_class(&request),
319 _ => Err(AwsServiceError::action_not_implemented(
320 self.service_name(),
321 &request.action,
322 )),
323 };
324 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
325 self.save_snapshot().await;
326 }
327 result
328 }
329
330 fn supported_actions(&self) -> &[&str] {
331 SUPPORTED_ACTIONS
332 }
333}
334
335impl EcrService {}
340
341impl EcrService {
346 fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
351 use crate::state::ImageScanFindings;
352 let layers = {
353 let mut accounts = self.state.write();
354 let Some(state) = accounts.get_mut(account) else {
355 return;
356 };
357 let Some(repo) = state.repositories.get_mut(name) else {
358 return;
359 };
360 repo.scan_findings.insert(
361 digest.to_string(),
362 ImageScanFindings {
363 image_digest: digest.to_string(),
364 scan_status: "IN_PROGRESS".to_string(),
365 scan_completed_at: None,
366 vulnerability_source_updated_at: None,
367 finding_severity_counts: BTreeMap::new(),
368 findings: Vec::new(),
369 },
370 );
371 layers_for_image(repo, digest)
372 };
373 let shared = self.state.clone();
374 let store = self.snapshot_store.clone();
375 let snap_lock = self.snapshot_lock.clone();
376 let account = account.to_string();
377 let name = name.to_string();
378 let digest = digest.to_string();
379 tokio::spawn(async move {
380 let result = crate::scanner::scan_layers(&digest, &layers).await;
381 {
382 let mut accounts = shared.write();
383 let Some(state) = accounts.get_mut(&account) else {
384 return;
385 };
386 let Some(repo) = state.repositories.get_mut(&name) else {
387 return;
388 };
389 let findings = result.unwrap_or_else(|| ImageScanFindings {
390 image_digest: digest.clone(),
391 scan_status: "COMPLETE".to_string(),
392 scan_completed_at: Some(Utc::now()),
393 vulnerability_source_updated_at: Some(Utc::now()),
394 finding_severity_counts: BTreeMap::new(),
395 findings: Vec::new(),
396 });
397 repo.scan_findings.insert(digest.clone(), findings);
398 }
399 EcrService::save_snapshot_with(shared, store, snap_lock).await;
400 });
401 }
402}
403
404impl EcrService {
407 fn put_replication_configuration(
408 &self,
409 request: &AwsRequest,
410 ) -> Result<AwsResponse, AwsServiceError> {
411 use crate::state::{
412 ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
413 };
414 let body = request.json_body();
415 let cfg_value = body
416 .get("replicationConfiguration")
417 .cloned()
418 .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
419 let rules_value = cfg_value
420 .get("rules")
421 .and_then(|v| v.as_array())
422 .cloned()
423 .unwrap_or_default();
424 let rules: Vec<ReplicationRule> = rules_value
425 .iter()
426 .map(|r| ReplicationRule {
427 destinations: r
428 .get("destinations")
429 .and_then(|v| v.as_array())
430 .map(|arr| {
431 arr.iter()
432 .map(|d| ReplicationDestination {
433 region: d
434 .get("region")
435 .and_then(|v| v.as_str())
436 .unwrap_or("")
437 .to_string(),
438 registry_id: d
439 .get("registryId")
440 .and_then(|v| v.as_str())
441 .unwrap_or("")
442 .to_string(),
443 })
444 .collect()
445 })
446 .unwrap_or_default(),
447 repository_filters: r
448 .get("repositoryFilters")
449 .and_then(|v| v.as_array())
450 .map(|arr| {
451 arr.iter()
452 .map(|f| RepositoryFilter {
453 filter: f
454 .get("filter")
455 .and_then(|v| v.as_str())
456 .unwrap_or("")
457 .to_string(),
458 filter_type: f
459 .get("filterType")
460 .and_then(|v| v.as_str())
461 .unwrap_or("PREFIX_MATCH")
462 .to_string(),
463 })
464 .collect()
465 })
466 .unwrap_or_default(),
467 })
468 .collect();
469 let account = target_account_id(request, &body);
470 let mut accounts = self.state.write();
471 let state = accounts.get_or_create(&account);
472 state.replication_configuration = Some(ReplicationConfiguration { rules });
473 let cfg = state.replication_configuration.clone().unwrap();
474 Ok(AwsResponse::ok_json(json!({
475 "replicationConfiguration": {
476 "rules": cfg.rules.iter().map(|r| json!({
477 "destinations": r.destinations.iter().map(|d| json!({
478 "region": d.region,
479 "registryId": d.registry_id,
480 })).collect::<Vec<_>>(),
481 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
482 "filter": f.filter,
483 "filterType": f.filter_type,
484 })).collect::<Vec<_>>(),
485 })).collect::<Vec<_>>(),
486 },
487 })))
488 }
489}
490
491#[path = "../service_helpers.rs"]
492mod service_helpers;
493pub use service_helpers::evaluate_lifecycle_policy;
494pub(crate) use service_helpers::*;
495
496#[cfg(test)]
497#[path = "../service_tests.rs"]
498mod tests;