Skip to main content

fakecloud_ecr/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19    EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20    Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24    "CreateRepository",
25    "DeleteRepository",
26    "DescribeRepositories",
27    "PutImageTagMutability",
28    "PutImageScanningConfiguration",
29    "SetRepositoryPolicy",
30    "GetRepositoryPolicy",
31    "DeleteRepositoryPolicy",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutImage",
36    "BatchGetImage",
37    "BatchDeleteImage",
38    "BatchCheckLayerAvailability",
39    "DescribeImages",
40    "ListImages",
41    "GetDownloadUrlForLayer",
42    "InitiateLayerUpload",
43    "UploadLayerPart",
44    "CompleteLayerUpload",
45    "GetAuthorizationToken",
46    "PutLifecyclePolicy",
47    "GetLifecyclePolicy",
48    "DeleteLifecyclePolicy",
49    "StartLifecyclePolicyPreview",
50    "GetLifecyclePolicyPreview",
51    "StartImageScan",
52    "DescribeImageScanFindings",
53    "DescribeRegistry",
54    "GetRegistryPolicy",
55    "PutRegistryPolicy",
56    "DeleteRegistryPolicy",
57    "GetRegistryScanningConfiguration",
58    "PutRegistryScanningConfiguration",
59    "BatchGetRepositoryScanningConfiguration",
60    "PutReplicationConfiguration",
61    "DescribeImageReplicationStatus",
62    "CreatePullThroughCacheRule",
63    "DeletePullThroughCacheRule",
64    "DescribePullThroughCacheRules",
65    "UpdatePullThroughCacheRule",
66    "ValidatePullThroughCacheRule",
67    "GetAccountSetting",
68    "PutAccountSetting",
69    "CreateRepositoryCreationTemplate",
70    "DeleteRepositoryCreationTemplate",
71    "DescribeRepositoryCreationTemplates",
72    "UpdateRepositoryCreationTemplate",
73    "GetSigningConfiguration",
74    "PutSigningConfiguration",
75    "DeleteSigningConfiguration",
76    "DescribeImageSigningStatus",
77    "RegisterPullTimeUpdateExclusion",
78    "DeregisterPullTimeUpdateExclusion",
79    "ListPullTimeUpdateExclusions",
80    "ListImageReferrers",
81    "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85    state: SharedEcrState,
86    snapshot_store: Option<Arc<dyn SnapshotStore>>,
87    snapshot_lock: Arc<AsyncMutex<()>>,
88    /// KMS state handle — when wired, repositories configured with
89    /// `EncryptionConfiguration.encryption_type == "KMS"` store layer
90    /// blobs encrypted under the configured CMK via
91    /// `fakecloud_kms::api::encrypt_blob` / `decrypt_blob`.
92    kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95mod images;
96mod layers;
97mod lifecycle;
98mod policies;
99mod pull_through;
100mod registry;
101mod repositories;
102mod scanning;
103mod settings;
104mod signing;
105mod tags;
106mod templates;
107
108impl EcrService {
109    pub fn new(state: SharedEcrState) -> Self {
110        Self {
111            state,
112            snapshot_store: None,
113            snapshot_lock: Arc::new(AsyncMutex::new(())),
114            kms_state: None,
115        }
116    }
117
118    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119        self.snapshot_store = Some(store);
120        self
121    }
122
123    pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
124        self.kms_state = Some(kms);
125        self
126    }
127
128    /// Read-only accessor for the multi-account state. The sibling
129    /// `oci` module owns the HTTP-layer adapter for the OCI v2
130    /// protocol and needs to reach the same repositories + blobs the
131    /// JSON control-plane ops read and write.
132    pub fn state_handle(&self) -> &SharedEcrState {
133        &self.state
134    }
135
136    /// Handle for the shared KMS state when wired. `None` skips the
137    /// encrypt/decrypt paths and stores / returns plaintext blobs.
138    pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
139        self.kms_state.as_ref()
140    }
141
142    async fn save_snapshot(&self) {
143        Self::save_snapshot_with(
144            self.state.clone(),
145            self.snapshot_store.clone(),
146            self.snapshot_lock.clone(),
147        )
148        .await;
149    }
150
151    /// Build a hook that persists the current state when invoked, or `None` in
152    /// memory mode. The CloudFormation provisioner mutates `state` directly and
153    /// uses this to write a CFN-provisioned resource through to disk.
154    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
155        let store = self.snapshot_store.clone()?;
156        let state = self.state.clone();
157        let lock = self.snapshot_lock.clone();
158        Some(Arc::new(move || {
159            let state = state.clone();
160            let store = store.clone();
161            let lock = lock.clone();
162            Box::pin(async move {
163                Self::save_snapshot_with(state, Some(store), lock).await;
164            })
165        }))
166    }
167
168    /// Snapshot writer reachable from background tasks (e.g. the async
169    /// image-scanner) that don't hold a `&self` reference. Equivalent
170    /// to [`save_snapshot`] but takes the components as owned clones.
171    pub(crate) async fn save_snapshot_with(
172        state: SharedEcrState,
173        store: Option<Arc<dyn SnapshotStore>>,
174        lock: Arc<AsyncMutex<()>>,
175    ) {
176        let Some(store) = store else {
177            return;
178        };
179        let _guard = lock.lock().await;
180        let snapshot = EcrSnapshot {
181            schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
182            accounts: Some(state.read().clone()),
183        };
184        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
185            let bytes = serde_json::to_vec(&snapshot)
186                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
187            store.save(&bytes)
188        })
189        .await;
190        match join {
191            Ok(Ok(())) => {}
192            Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
193            Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
194        }
195    }
196}
197
198#[async_trait]
199impl AwsService for EcrService {
200    fn service_name(&self) -> &str {
201        "ecr"
202    }
203
204    async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
205        // OCI v2 Distribution requests come in as path-only REST
206        // (`/v2/...` with no `X-Amz-Target`). Dispatch them before the
207        // JSON control plane. Blob upload PATCH/PUT consume the raw
208        // body stream directly via a per-upload spool file — they
209        // never call `drain_request_stream`, so a 1 GiB layer push
210        // moves through fakecloud in constant memory. Other OCI routes
211        // (manifest PUT, blob HEAD/GET, …) keep using `request.body`
212        // so we drain the stream conditionally.
213        if request
214            .path_segments
215            .first()
216            .map(|s| s == "v2")
217            .unwrap_or(false)
218        {
219            // Drain unless this is a blob-upload PATCH/PUT — those
220            // routes own the streaming consumer.
221            let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
222                && request.path_segments.len() >= 5
223                && request.path_segments[request.path_segments.len() - 2] == "uploads";
224            if !is_blob_upload {
225                if let Some(stream) = request.take_body_stream() {
226                    request.body = fakecloud_core::service::drain_request_stream(stream).await?;
227                }
228            }
229            let result = crate::oci::dispatch(self, &request).await;
230            // POST/PUT/PATCH/DELETE always mutate. GET to a `blobs/<digest>`
231            // or `manifests/<reference>` endpoint also mutates because those
232            // handlers bump the touched image's `last_in_use_at` /
233            // `in_use_count` / `last_recorded_pull_time`. `tags/list` (also
234            // GET) is read-only and excluded.
235            let is_pull_get = request.method == http::Method::GET
236                && request.path_segments.len() >= 3
237                && matches!(
238                    request.path_segments[request.path_segments.len() - 2].as_str(),
239                    "blobs" | "manifests"
240                );
241            let mutates_oci = is_pull_get
242                || matches!(
243                    request.method,
244                    http::Method::POST
245                        | http::Method::PUT
246                        | http::Method::PATCH
247                        | http::Method::DELETE
248                );
249            if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
250                self.save_snapshot().await;
251            }
252            return result;
253        }
254
255        // JSON control plane: actions read `request.body`, so drain.
256        if let Some(stream) = request.take_body_stream() {
257            request.body = fakecloud_core::service::drain_request_stream(stream).await?;
258        }
259
260        let mutates = is_mutating(request.action.as_str());
261        let result = match request.action.as_str() {
262            "CreateRepository" => self.create_repository(&request),
263            "DeleteRepository" => self.delete_repository(&request),
264            "DescribeRepositories" => self.describe_repositories(&request),
265            "PutImageTagMutability" => self.put_image_tag_mutability(&request),
266            "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
267            "SetRepositoryPolicy" => self.set_repository_policy(&request),
268            "GetRepositoryPolicy" => self.get_repository_policy(&request),
269            "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
270            "TagResource" => self.tag_resource(&request),
271            "UntagResource" => self.untag_resource(&request),
272            "ListTagsForResource" => self.list_tags_for_resource(&request),
273            "PutImage" => self.put_image(&request),
274            "BatchGetImage" => self.batch_get_image(&request),
275            "BatchDeleteImage" => self.batch_delete_image(&request),
276            "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
277            "DescribeImages" => self.describe_images(&request),
278            "ListImages" => self.list_images(&request),
279            "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
280            "InitiateLayerUpload" => self.initiate_layer_upload(&request),
281            "UploadLayerPart" => self.upload_layer_part(&request).await,
282            "CompleteLayerUpload" => self.complete_layer_upload(&request).await,
283            "GetAuthorizationToken" => self.get_authorization_token(&request),
284            "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
285            "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
286            "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
287            "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
288            "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
289            "StartImageScan" => self.start_image_scan(&request),
290            "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
291            "DescribeRegistry" => self.describe_registry(&request),
292            "GetRegistryPolicy" => self.get_registry_policy(&request),
293            "PutRegistryPolicy" => self.put_registry_policy(&request),
294            "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
295            "GetRegistryScanningConfiguration" => {
296                self.get_registry_scanning_configuration(&request)
297            }
298            "PutRegistryScanningConfiguration" => {
299                self.put_registry_scanning_configuration(&request)
300            }
301            "BatchGetRepositoryScanningConfiguration" => {
302                self.batch_get_repository_scanning_configuration(&request)
303            }
304            "PutReplicationConfiguration" => self.put_replication_configuration(&request),
305            "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
306            "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
307            "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
308            "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
309            "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
310            "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
311            "GetAccountSetting" => self.get_account_setting(&request),
312            "PutAccountSetting" => self.put_account_setting(&request),
313            "CreateRepositoryCreationTemplate" => {
314                self.create_repository_creation_template(&request)
315            }
316            "DeleteRepositoryCreationTemplate" => {
317                self.delete_repository_creation_template(&request)
318            }
319            "DescribeRepositoryCreationTemplates" => {
320                self.describe_repository_creation_templates(&request)
321            }
322            "UpdateRepositoryCreationTemplate" => {
323                self.update_repository_creation_template(&request)
324            }
325            "GetSigningConfiguration" => self.get_signing_configuration(&request),
326            "PutSigningConfiguration" => self.put_signing_configuration(&request),
327            "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
328            "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
329            "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
330            "DeregisterPullTimeUpdateExclusion" => {
331                self.deregister_pull_time_update_exclusion(&request)
332            }
333            "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
334            "ListImageReferrers" => self.list_image_referrers(&request),
335            "UpdateImageStorageClass" => self.update_image_storage_class(&request),
336            _ => Err(AwsServiceError::action_not_implemented(
337                self.service_name(),
338                &request.action,
339            )),
340        };
341        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
342            self.save_snapshot().await;
343        }
344        result
345    }
346
347    fn supported_actions(&self) -> &[&str] {
348        SUPPORTED_ACTIONS
349    }
350}
351
352// -------- helpers --------
353
354// -------- operations --------
355
356impl EcrService {}
357
358// -------- image + layer helpers --------
359
360// -------- image + layer operations --------
361
362impl EcrService {
363    /// Mark `digest` as `IN_PROGRESS` and spawn the scanner task.
364    /// Identical wiring to `start_image_scan` minus the request-shaped
365    /// response — used both by the user-facing `StartImageScan` and the
366    /// `scan_on_push=true` PutImage hook.
367    fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
368        use crate::state::ImageScanFindings;
369        let layers = {
370            let mut accounts = self.state.write();
371            let Some(state) = accounts.get_mut(account) else {
372                return;
373            };
374            let Some(repo) = state.repositories.get_mut(name) else {
375                return;
376            };
377            repo.scan_findings.insert(
378                digest.to_string(),
379                ImageScanFindings {
380                    image_digest: digest.to_string(),
381                    scan_status: "IN_PROGRESS".to_string(),
382                    scan_completed_at: None,
383                    vulnerability_source_updated_at: None,
384                    finding_severity_counts: BTreeMap::new(),
385                    findings: Vec::new(),
386                },
387            );
388            layers_for_image(repo, digest)
389        };
390        let shared = self.state.clone();
391        let store = self.snapshot_store.clone();
392        let snap_lock = self.snapshot_lock.clone();
393        let account = account.to_string();
394        let name = name.to_string();
395        let digest = digest.to_string();
396        tokio::spawn(async move {
397            let result = crate::scanner::scan_layers(&digest, &layers).await;
398            {
399                let mut accounts = shared.write();
400                let Some(state) = accounts.get_mut(&account) else {
401                    return;
402                };
403                let Some(repo) = state.repositories.get_mut(&name) else {
404                    return;
405                };
406                let findings = result.unwrap_or_else(|| ImageScanFindings {
407                    image_digest: digest.clone(),
408                    scan_status: "COMPLETE".to_string(),
409                    scan_completed_at: Some(Utc::now()),
410                    vulnerability_source_updated_at: Some(Utc::now()),
411                    finding_severity_counts: BTreeMap::new(),
412                    findings: Vec::new(),
413                });
414                repo.scan_findings.insert(digest.clone(), findings);
415            }
416            EcrService::save_snapshot_with(shared, store, snap_lock).await;
417        });
418    }
419}
420
421// -------- lifecycle + scan + registry + polish handlers (Batch 4) --------
422
423impl EcrService {
424    fn put_replication_configuration(
425        &self,
426        request: &AwsRequest,
427    ) -> Result<AwsResponse, AwsServiceError> {
428        use crate::state::{
429            ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
430        };
431        let body = request.json_body();
432        let cfg_value = body
433            .get("replicationConfiguration")
434            .cloned()
435            .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
436        let rules_value = cfg_value
437            .get("rules")
438            .and_then(|v| v.as_array())
439            .cloned()
440            .unwrap_or_default();
441        let rules: Vec<ReplicationRule> = rules_value
442            .iter()
443            .map(|r| ReplicationRule {
444                destinations: r
445                    .get("destinations")
446                    .and_then(|v| v.as_array())
447                    .map(|arr| {
448                        arr.iter()
449                            .map(|d| ReplicationDestination {
450                                region: d
451                                    .get("region")
452                                    .and_then(|v| v.as_str())
453                                    .unwrap_or("")
454                                    .to_string(),
455                                registry_id: d
456                                    .get("registryId")
457                                    .and_then(|v| v.as_str())
458                                    .unwrap_or("")
459                                    .to_string(),
460                            })
461                            .collect()
462                    })
463                    .unwrap_or_default(),
464                repository_filters: r
465                    .get("repositoryFilters")
466                    .and_then(|v| v.as_array())
467                    .map(|arr| {
468                        arr.iter()
469                            .map(|f| RepositoryFilter {
470                                filter: f
471                                    .get("filter")
472                                    .and_then(|v| v.as_str())
473                                    .unwrap_or("")
474                                    .to_string(),
475                                filter_type: f
476                                    .get("filterType")
477                                    .and_then(|v| v.as_str())
478                                    .unwrap_or("PREFIX_MATCH")
479                                    .to_string(),
480                            })
481                            .collect()
482                    })
483                    .unwrap_or_default(),
484            })
485            .collect();
486        let account = target_account_id(request, &body);
487        let mut accounts = self.state.write();
488        let state = accounts.get_or_create(&account);
489        state.replication_configuration = Some(ReplicationConfiguration { rules });
490        let cfg = state.replication_configuration.clone().unwrap();
491        Ok(AwsResponse::ok_json(json!({
492            "replicationConfiguration": {
493                "rules": cfg.rules.iter().map(|r| json!({
494                    "destinations": r.destinations.iter().map(|d| json!({
495                        "region": d.region,
496                        "registryId": d.registry_id,
497                    })).collect::<Vec<_>>(),
498                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
499                        "filter": f.filter,
500                        "filterType": f.filter_type,
501                    })).collect::<Vec<_>>(),
502                })).collect::<Vec<_>>(),
503            },
504        })))
505    }
506}
507
508#[path = "../service_helpers.rs"]
509mod service_helpers;
510pub use service_helpers::evaluate_lifecycle_policy;
511pub(crate) use service_helpers::*;
512
513#[cfg(test)]
514#[path = "../service_tests.rs"]
515mod tests;