1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19 EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20 Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24 "CreateRepository",
25 "DeleteRepository",
26 "DescribeRepositories",
27 "PutImageTagMutability",
28 "PutImageScanningConfiguration",
29 "SetRepositoryPolicy",
30 "GetRepositoryPolicy",
31 "DeleteRepositoryPolicy",
32 "TagResource",
33 "UntagResource",
34 "ListTagsForResource",
35 "PutImage",
36 "BatchGetImage",
37 "BatchDeleteImage",
38 "BatchCheckLayerAvailability",
39 "DescribeImages",
40 "ListImages",
41 "GetDownloadUrlForLayer",
42 "InitiateLayerUpload",
43 "UploadLayerPart",
44 "CompleteLayerUpload",
45 "GetAuthorizationToken",
46 "PutLifecyclePolicy",
47 "GetLifecyclePolicy",
48 "DeleteLifecyclePolicy",
49 "StartLifecyclePolicyPreview",
50 "GetLifecyclePolicyPreview",
51 "StartImageScan",
52 "DescribeImageScanFindings",
53 "DescribeRegistry",
54 "GetRegistryPolicy",
55 "PutRegistryPolicy",
56 "DeleteRegistryPolicy",
57 "GetRegistryScanningConfiguration",
58 "PutRegistryScanningConfiguration",
59 "BatchGetRepositoryScanningConfiguration",
60 "PutReplicationConfiguration",
61 "DescribeImageReplicationStatus",
62 "CreatePullThroughCacheRule",
63 "DeletePullThroughCacheRule",
64 "DescribePullThroughCacheRules",
65 "UpdatePullThroughCacheRule",
66 "ValidatePullThroughCacheRule",
67 "GetAccountSetting",
68 "PutAccountSetting",
69 "CreateRepositoryCreationTemplate",
70 "DeleteRepositoryCreationTemplate",
71 "DescribeRepositoryCreationTemplates",
72 "UpdateRepositoryCreationTemplate",
73 "GetSigningConfiguration",
74 "PutSigningConfiguration",
75 "DeleteSigningConfiguration",
76 "DescribeImageSigningStatus",
77 "RegisterPullTimeUpdateExclusion",
78 "DeregisterPullTimeUpdateExclusion",
79 "ListPullTimeUpdateExclusions",
80 "ListImageReferrers",
81 "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85 state: SharedEcrState,
86 snapshot_store: Option<Arc<dyn SnapshotStore>>,
87 snapshot_lock: Arc<AsyncMutex<()>>,
88 kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95mod images;
96mod layers;
97mod lifecycle;
98mod policies;
99mod pull_through;
100mod registry;
101mod repositories;
102mod scanning;
103mod settings;
104mod signing;
105mod tags;
106mod templates;
107
108impl EcrService {
109 pub fn new(state: SharedEcrState) -> Self {
110 Self {
111 state,
112 snapshot_store: None,
113 snapshot_lock: Arc::new(AsyncMutex::new(())),
114 kms_state: None,
115 }
116 }
117
118 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119 self.snapshot_store = Some(store);
120 self
121 }
122
123 pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
124 self.kms_state = Some(kms);
125 self
126 }
127
128 pub fn state_handle(&self) -> &SharedEcrState {
133 &self.state
134 }
135
136 pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
139 self.kms_state.as_ref()
140 }
141
142 async fn save_snapshot(&self) {
143 Self::save_snapshot_with(
144 self.state.clone(),
145 self.snapshot_store.clone(),
146 self.snapshot_lock.clone(),
147 )
148 .await;
149 }
150
151 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
155 let store = self.snapshot_store.clone()?;
156 let state = self.state.clone();
157 let lock = self.snapshot_lock.clone();
158 Some(Arc::new(move || {
159 let state = state.clone();
160 let store = store.clone();
161 let lock = lock.clone();
162 Box::pin(async move {
163 Self::save_snapshot_with(state, Some(store), lock).await;
164 })
165 }))
166 }
167
168 pub(crate) async fn save_snapshot_with(
172 state: SharedEcrState,
173 store: Option<Arc<dyn SnapshotStore>>,
174 lock: Arc<AsyncMutex<()>>,
175 ) {
176 let Some(store) = store else {
177 return;
178 };
179 let _guard = lock.lock().await;
180 let snapshot = EcrSnapshot {
181 schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
182 accounts: Some(state.read().clone()),
183 };
184 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
185 let bytes = serde_json::to_vec(&snapshot)
186 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
187 store.save(&bytes)
188 })
189 .await;
190 match join {
191 Ok(Ok(())) => {}
192 Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
193 Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
194 }
195 }
196}
197
198#[async_trait]
199impl AwsService for EcrService {
200 fn service_name(&self) -> &str {
201 "ecr"
202 }
203
204 async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
205 if request
214 .path_segments
215 .first()
216 .map(|s| s == "v2")
217 .unwrap_or(false)
218 {
219 let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
222 && request.path_segments.len() >= 5
223 && request.path_segments[request.path_segments.len() - 2] == "uploads";
224 if !is_blob_upload {
225 if let Some(stream) = request.take_body_stream() {
226 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
227 }
228 }
229 let result = crate::oci::dispatch(self, &request).await;
230 let is_pull_get = request.method == http::Method::GET
236 && request.path_segments.len() >= 3
237 && matches!(
238 request.path_segments[request.path_segments.len() - 2].as_str(),
239 "blobs" | "manifests"
240 );
241 let mutates_oci = is_pull_get
242 || matches!(
243 request.method,
244 http::Method::POST
245 | http::Method::PUT
246 | http::Method::PATCH
247 | http::Method::DELETE
248 );
249 if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
250 self.save_snapshot().await;
251 }
252 return result;
253 }
254
255 if let Some(stream) = request.take_body_stream() {
257 request.body = fakecloud_core::service::drain_request_stream(stream).await?;
258 }
259
260 let mutates = is_mutating(request.action.as_str());
261 let result = match request.action.as_str() {
262 "CreateRepository" => self.create_repository(&request),
263 "DeleteRepository" => self.delete_repository(&request),
264 "DescribeRepositories" => self.describe_repositories(&request),
265 "PutImageTagMutability" => self.put_image_tag_mutability(&request),
266 "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
267 "SetRepositoryPolicy" => self.set_repository_policy(&request),
268 "GetRepositoryPolicy" => self.get_repository_policy(&request),
269 "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
270 "TagResource" => self.tag_resource(&request),
271 "UntagResource" => self.untag_resource(&request),
272 "ListTagsForResource" => self.list_tags_for_resource(&request),
273 "PutImage" => self.put_image(&request),
274 "BatchGetImage" => self.batch_get_image(&request),
275 "BatchDeleteImage" => self.batch_delete_image(&request),
276 "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
277 "DescribeImages" => self.describe_images(&request),
278 "ListImages" => self.list_images(&request),
279 "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
280 "InitiateLayerUpload" => self.initiate_layer_upload(&request),
281 "UploadLayerPart" => self.upload_layer_part(&request).await,
282 "CompleteLayerUpload" => self.complete_layer_upload(&request).await,
283 "GetAuthorizationToken" => self.get_authorization_token(&request),
284 "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
285 "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
286 "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
287 "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
288 "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
289 "StartImageScan" => self.start_image_scan(&request),
290 "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
291 "DescribeRegistry" => self.describe_registry(&request),
292 "GetRegistryPolicy" => self.get_registry_policy(&request),
293 "PutRegistryPolicy" => self.put_registry_policy(&request),
294 "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
295 "GetRegistryScanningConfiguration" => {
296 self.get_registry_scanning_configuration(&request)
297 }
298 "PutRegistryScanningConfiguration" => {
299 self.put_registry_scanning_configuration(&request)
300 }
301 "BatchGetRepositoryScanningConfiguration" => {
302 self.batch_get_repository_scanning_configuration(&request)
303 }
304 "PutReplicationConfiguration" => self.put_replication_configuration(&request),
305 "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
306 "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
307 "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
308 "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
309 "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
310 "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
311 "GetAccountSetting" => self.get_account_setting(&request),
312 "PutAccountSetting" => self.put_account_setting(&request),
313 "CreateRepositoryCreationTemplate" => {
314 self.create_repository_creation_template(&request)
315 }
316 "DeleteRepositoryCreationTemplate" => {
317 self.delete_repository_creation_template(&request)
318 }
319 "DescribeRepositoryCreationTemplates" => {
320 self.describe_repository_creation_templates(&request)
321 }
322 "UpdateRepositoryCreationTemplate" => {
323 self.update_repository_creation_template(&request)
324 }
325 "GetSigningConfiguration" => self.get_signing_configuration(&request),
326 "PutSigningConfiguration" => self.put_signing_configuration(&request),
327 "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
328 "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
329 "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
330 "DeregisterPullTimeUpdateExclusion" => {
331 self.deregister_pull_time_update_exclusion(&request)
332 }
333 "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
334 "ListImageReferrers" => self.list_image_referrers(&request),
335 "UpdateImageStorageClass" => self.update_image_storage_class(&request),
336 _ => Err(AwsServiceError::action_not_implemented(
337 self.service_name(),
338 &request.action,
339 )),
340 };
341 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
342 self.save_snapshot().await;
343 }
344 result
345 }
346
347 fn supported_actions(&self) -> &[&str] {
348 SUPPORTED_ACTIONS
349 }
350}
351
352impl EcrService {}
357
358impl EcrService {
363 fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
368 use crate::state::ImageScanFindings;
369 let layers = {
370 let mut accounts = self.state.write();
371 let Some(state) = accounts.get_mut(account) else {
372 return;
373 };
374 let Some(repo) = state.repositories.get_mut(name) else {
375 return;
376 };
377 repo.scan_findings.insert(
378 digest.to_string(),
379 ImageScanFindings {
380 image_digest: digest.to_string(),
381 scan_status: "IN_PROGRESS".to_string(),
382 scan_completed_at: None,
383 vulnerability_source_updated_at: None,
384 finding_severity_counts: BTreeMap::new(),
385 findings: Vec::new(),
386 },
387 );
388 layers_for_image(repo, digest)
389 };
390 let shared = self.state.clone();
391 let store = self.snapshot_store.clone();
392 let snap_lock = self.snapshot_lock.clone();
393 let account = account.to_string();
394 let name = name.to_string();
395 let digest = digest.to_string();
396 tokio::spawn(async move {
397 let result = crate::scanner::scan_layers(&digest, &layers).await;
398 {
399 let mut accounts = shared.write();
400 let Some(state) = accounts.get_mut(&account) else {
401 return;
402 };
403 let Some(repo) = state.repositories.get_mut(&name) else {
404 return;
405 };
406 let findings = result.unwrap_or_else(|| ImageScanFindings {
407 image_digest: digest.clone(),
408 scan_status: "COMPLETE".to_string(),
409 scan_completed_at: Some(Utc::now()),
410 vulnerability_source_updated_at: Some(Utc::now()),
411 finding_severity_counts: BTreeMap::new(),
412 findings: Vec::new(),
413 });
414 repo.scan_findings.insert(digest.clone(), findings);
415 }
416 EcrService::save_snapshot_with(shared, store, snap_lock).await;
417 });
418 }
419}
420
421impl EcrService {
424 fn put_replication_configuration(
425 &self,
426 request: &AwsRequest,
427 ) -> Result<AwsResponse, AwsServiceError> {
428 use crate::state::{
429 ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
430 };
431 let body = request.json_body();
432 let cfg_value = body
433 .get("replicationConfiguration")
434 .cloned()
435 .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
436 let rules_value = cfg_value
437 .get("rules")
438 .and_then(|v| v.as_array())
439 .cloned()
440 .unwrap_or_default();
441 let rules: Vec<ReplicationRule> = rules_value
442 .iter()
443 .map(|r| ReplicationRule {
444 destinations: r
445 .get("destinations")
446 .and_then(|v| v.as_array())
447 .map(|arr| {
448 arr.iter()
449 .map(|d| ReplicationDestination {
450 region: d
451 .get("region")
452 .and_then(|v| v.as_str())
453 .unwrap_or("")
454 .to_string(),
455 registry_id: d
456 .get("registryId")
457 .and_then(|v| v.as_str())
458 .unwrap_or("")
459 .to_string(),
460 })
461 .collect()
462 })
463 .unwrap_or_default(),
464 repository_filters: r
465 .get("repositoryFilters")
466 .and_then(|v| v.as_array())
467 .map(|arr| {
468 arr.iter()
469 .map(|f| RepositoryFilter {
470 filter: f
471 .get("filter")
472 .and_then(|v| v.as_str())
473 .unwrap_or("")
474 .to_string(),
475 filter_type: f
476 .get("filterType")
477 .and_then(|v| v.as_str())
478 .unwrap_or("PREFIX_MATCH")
479 .to_string(),
480 })
481 .collect()
482 })
483 .unwrap_or_default(),
484 })
485 .collect();
486 let account = target_account_id(request, &body);
487 let mut accounts = self.state.write();
488 let state = accounts.get_or_create(&account);
489 state.replication_configuration = Some(ReplicationConfiguration { rules });
490 let cfg = state.replication_configuration.clone().unwrap();
491 Ok(AwsResponse::ok_json(json!({
492 "replicationConfiguration": {
493 "rules": cfg.rules.iter().map(|r| json!({
494 "destinations": r.destinations.iter().map(|d| json!({
495 "region": d.region,
496 "registryId": d.registry_id,
497 })).collect::<Vec<_>>(),
498 "repositoryFilters": r.repository_filters.iter().map(|f| json!({
499 "filter": f.filter,
500 "filterType": f.filter_type,
501 })).collect::<Vec<_>>(),
502 })).collect::<Vec<_>>(),
503 },
504 })))
505 }
506}
507
508#[path = "../service_helpers.rs"]
509mod service_helpers;
510pub use service_helpers::evaluate_lifecycle_policy;
511pub(crate) use service_helpers::*;
512
513#[cfg(test)]
514#[path = "../service_tests.rs"]
515mod tests;