Skip to main content

fakecloud_ecr/service/
mod.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::engine::general_purpose::STANDARD as B64;
6use base64::Engine;
7use chrono::Utc;
8use http::StatusCode;
9use serde_json::{json, Map, Value};
10use sha2::{Digest, Sha256};
11use tokio::sync::Mutex as AsyncMutex;
12use uuid::Uuid;
13
14use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
15use fakecloud_core::validation::validate_string_length;
16use fakecloud_persistence::SnapshotStore;
17
18use crate::state::{
19    EcrSnapshot, EncryptionConfiguration, Image, ImageScanningConfiguration, Layer, LayerUpload,
20    Repository, SharedEcrState, ECR_SNAPSHOT_SCHEMA_VERSION,
21};
22
23const SUPPORTED_ACTIONS: &[&str] = &[
24    "CreateRepository",
25    "DeleteRepository",
26    "DescribeRepositories",
27    "PutImageTagMutability",
28    "PutImageScanningConfiguration",
29    "SetRepositoryPolicy",
30    "GetRepositoryPolicy",
31    "DeleteRepositoryPolicy",
32    "TagResource",
33    "UntagResource",
34    "ListTagsForResource",
35    "PutImage",
36    "BatchGetImage",
37    "BatchDeleteImage",
38    "BatchCheckLayerAvailability",
39    "DescribeImages",
40    "ListImages",
41    "GetDownloadUrlForLayer",
42    "InitiateLayerUpload",
43    "UploadLayerPart",
44    "CompleteLayerUpload",
45    "GetAuthorizationToken",
46    "PutLifecyclePolicy",
47    "GetLifecyclePolicy",
48    "DeleteLifecyclePolicy",
49    "StartLifecyclePolicyPreview",
50    "GetLifecyclePolicyPreview",
51    "StartImageScan",
52    "DescribeImageScanFindings",
53    "DescribeRegistry",
54    "GetRegistryPolicy",
55    "PutRegistryPolicy",
56    "DeleteRegistryPolicy",
57    "GetRegistryScanningConfiguration",
58    "PutRegistryScanningConfiguration",
59    "BatchGetRepositoryScanningConfiguration",
60    "PutReplicationConfiguration",
61    "DescribeImageReplicationStatus",
62    "CreatePullThroughCacheRule",
63    "DeletePullThroughCacheRule",
64    "DescribePullThroughCacheRules",
65    "UpdatePullThroughCacheRule",
66    "ValidatePullThroughCacheRule",
67    "GetAccountSetting",
68    "PutAccountSetting",
69    "CreateRepositoryCreationTemplate",
70    "DeleteRepositoryCreationTemplate",
71    "DescribeRepositoryCreationTemplates",
72    "UpdateRepositoryCreationTemplate",
73    "GetSigningConfiguration",
74    "PutSigningConfiguration",
75    "DeleteSigningConfiguration",
76    "DescribeImageSigningStatus",
77    "RegisterPullTimeUpdateExclusion",
78    "DeregisterPullTimeUpdateExclusion",
79    "ListPullTimeUpdateExclusions",
80    "ListImageReferrers",
81    "UpdateImageStorageClass",
82];
83
84pub struct EcrService {
85    state: SharedEcrState,
86    snapshot_store: Option<Arc<dyn SnapshotStore>>,
87    snapshot_lock: Arc<AsyncMutex<()>>,
88    /// KMS state handle — when wired, repositories configured with
89    /// `EncryptionConfiguration.encryption_type == "KMS"` store layer
90    /// blobs encrypted under the configured CMK via
91    /// `fakecloud_kms::api::encrypt_blob` / `decrypt_blob`.
92    kms_state: Option<fakecloud_kms::SharedKmsState>,
93}
94
95mod images;
96mod layers;
97mod lifecycle;
98mod policies;
99mod pull_through;
100mod registry;
101mod repositories;
102mod scanning;
103mod settings;
104mod signing;
105mod tags;
106mod templates;
107
108impl EcrService {
109    pub fn new(state: SharedEcrState) -> Self {
110        Self {
111            state,
112            snapshot_store: None,
113            snapshot_lock: Arc::new(AsyncMutex::new(())),
114            kms_state: None,
115        }
116    }
117
118    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
119        self.snapshot_store = Some(store);
120        self
121    }
122
123    pub fn with_kms(mut self, kms: fakecloud_kms::SharedKmsState) -> Self {
124        self.kms_state = Some(kms);
125        self
126    }
127
128    /// Read-only accessor for the multi-account state. The sibling
129    /// `oci` module owns the HTTP-layer adapter for the OCI v2
130    /// protocol and needs to reach the same repositories + blobs the
131    /// JSON control-plane ops read and write.
132    pub fn state_handle(&self) -> &SharedEcrState {
133        &self.state
134    }
135
136    /// Handle for the shared KMS state when wired. `None` skips the
137    /// encrypt/decrypt paths and stores / returns plaintext blobs.
138    pub(crate) fn kms_handle(&self) -> Option<&fakecloud_kms::SharedKmsState> {
139        self.kms_state.as_ref()
140    }
141
142    async fn save_snapshot(&self) {
143        Self::save_snapshot_with(
144            self.state.clone(),
145            self.snapshot_store.clone(),
146            self.snapshot_lock.clone(),
147        )
148        .await
149    }
150
151    /// Snapshot writer reachable from background tasks (e.g. the async
152    /// image-scanner) that don't hold a `&self` reference. Equivalent
153    /// to [`save_snapshot`] but takes the components as owned clones.
154    pub(crate) async fn save_snapshot_with(
155        state: SharedEcrState,
156        store: Option<Arc<dyn SnapshotStore>>,
157        lock: Arc<AsyncMutex<()>>,
158    ) {
159        let Some(store) = store else {
160            return;
161        };
162        let _guard = lock.lock().await;
163        let snapshot = EcrSnapshot {
164            schema_version: ECR_SNAPSHOT_SCHEMA_VERSION,
165            accounts: Some(state.read().clone()),
166        };
167        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
168            let bytes = serde_json::to_vec(&snapshot)
169                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
170            store.save(&bytes)
171        })
172        .await;
173        match join {
174            Ok(Ok(())) => {}
175            Ok(Err(err)) => tracing::error!(%err, "failed to write ecr snapshot"),
176            Err(err) => tracing::error!(%err, "ecr snapshot task panicked"),
177        }
178    }
179}
180
181#[async_trait]
182impl AwsService for EcrService {
183    fn service_name(&self) -> &str {
184        "ecr"
185    }
186
187    async fn handle(&self, mut request: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
188        // OCI v2 Distribution requests come in as path-only REST
189        // (`/v2/...` with no `X-Amz-Target`). Dispatch them before the
190        // JSON control plane. Blob upload PATCH/PUT consume the raw
191        // body stream directly via a per-upload spool file — they
192        // never call `drain_request_stream`, so a 1 GiB layer push
193        // moves through fakecloud in constant memory. Other OCI routes
194        // (manifest PUT, blob HEAD/GET, …) keep using `request.body`
195        // so we drain the stream conditionally.
196        if request
197            .path_segments
198            .first()
199            .map(|s| s == "v2")
200            .unwrap_or(false)
201        {
202            // Drain unless this is a blob-upload PATCH/PUT — those
203            // routes own the streaming consumer.
204            let is_blob_upload = matches!(request.method, http::Method::PATCH | http::Method::PUT)
205                && request.path_segments.len() >= 5
206                && request.path_segments[request.path_segments.len() - 2] == "uploads";
207            if !is_blob_upload {
208                if let Some(stream) = request.take_body_stream() {
209                    request.body = fakecloud_core::service::drain_request_stream(stream).await?;
210                }
211            }
212            let result = crate::oci::dispatch(self, &request).await;
213            // POST/PUT/PATCH/DELETE always mutate. GET to a `blobs/<digest>`
214            // or `manifests/<reference>` endpoint also mutates because those
215            // handlers bump the touched image's `last_in_use_at` /
216            // `in_use_count` / `last_recorded_pull_time`. `tags/list` (also
217            // GET) is read-only and excluded.
218            let is_pull_get = request.method == http::Method::GET
219                && request.path_segments.len() >= 3
220                && matches!(
221                    request.path_segments[request.path_segments.len() - 2].as_str(),
222                    "blobs" | "manifests"
223                );
224            let mutates_oci = is_pull_get
225                || matches!(
226                    request.method,
227                    http::Method::POST
228                        | http::Method::PUT
229                        | http::Method::PATCH
230                        | http::Method::DELETE
231                );
232            if mutates_oci && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
233                self.save_snapshot().await;
234            }
235            return result;
236        }
237
238        // JSON control plane: actions read `request.body`, so drain.
239        if let Some(stream) = request.take_body_stream() {
240            request.body = fakecloud_core::service::drain_request_stream(stream).await?;
241        }
242
243        let mutates = is_mutating(request.action.as_str());
244        let result = match request.action.as_str() {
245            "CreateRepository" => self.create_repository(&request),
246            "DeleteRepository" => self.delete_repository(&request),
247            "DescribeRepositories" => self.describe_repositories(&request),
248            "PutImageTagMutability" => self.put_image_tag_mutability(&request),
249            "PutImageScanningConfiguration" => self.put_image_scanning_configuration(&request),
250            "SetRepositoryPolicy" => self.set_repository_policy(&request),
251            "GetRepositoryPolicy" => self.get_repository_policy(&request),
252            "DeleteRepositoryPolicy" => self.delete_repository_policy(&request),
253            "TagResource" => self.tag_resource(&request),
254            "UntagResource" => self.untag_resource(&request),
255            "ListTagsForResource" => self.list_tags_for_resource(&request),
256            "PutImage" => self.put_image(&request),
257            "BatchGetImage" => self.batch_get_image(&request),
258            "BatchDeleteImage" => self.batch_delete_image(&request),
259            "BatchCheckLayerAvailability" => self.batch_check_layer_availability(&request),
260            "DescribeImages" => self.describe_images(&request),
261            "ListImages" => self.list_images(&request),
262            "GetDownloadUrlForLayer" => self.get_download_url_for_layer(&request),
263            "InitiateLayerUpload" => self.initiate_layer_upload(&request),
264            "UploadLayerPart" => self.upload_layer_part(&request),
265            "CompleteLayerUpload" => self.complete_layer_upload(&request),
266            "GetAuthorizationToken" => self.get_authorization_token(&request),
267            "PutLifecyclePolicy" => self.put_lifecycle_policy(&request),
268            "GetLifecyclePolicy" => self.get_lifecycle_policy(&request),
269            "DeleteLifecyclePolicy" => self.delete_lifecycle_policy(&request),
270            "StartLifecyclePolicyPreview" => self.start_lifecycle_policy_preview(&request),
271            "GetLifecyclePolicyPreview" => self.get_lifecycle_policy_preview(&request),
272            "StartImageScan" => self.start_image_scan(&request),
273            "DescribeImageScanFindings" => self.describe_image_scan_findings(&request),
274            "DescribeRegistry" => self.describe_registry(&request),
275            "GetRegistryPolicy" => self.get_registry_policy(&request),
276            "PutRegistryPolicy" => self.put_registry_policy(&request),
277            "DeleteRegistryPolicy" => self.delete_registry_policy(&request),
278            "GetRegistryScanningConfiguration" => {
279                self.get_registry_scanning_configuration(&request)
280            }
281            "PutRegistryScanningConfiguration" => {
282                self.put_registry_scanning_configuration(&request)
283            }
284            "BatchGetRepositoryScanningConfiguration" => {
285                self.batch_get_repository_scanning_configuration(&request)
286            }
287            "PutReplicationConfiguration" => self.put_replication_configuration(&request),
288            "DescribeImageReplicationStatus" => self.describe_image_replication_status(&request),
289            "CreatePullThroughCacheRule" => self.create_pull_through_cache_rule(&request),
290            "DeletePullThroughCacheRule" => self.delete_pull_through_cache_rule(&request),
291            "DescribePullThroughCacheRules" => self.describe_pull_through_cache_rules(&request),
292            "UpdatePullThroughCacheRule" => self.update_pull_through_cache_rule(&request),
293            "ValidatePullThroughCacheRule" => self.validate_pull_through_cache_rule(&request),
294            "GetAccountSetting" => self.get_account_setting(&request),
295            "PutAccountSetting" => self.put_account_setting(&request),
296            "CreateRepositoryCreationTemplate" => {
297                self.create_repository_creation_template(&request)
298            }
299            "DeleteRepositoryCreationTemplate" => {
300                self.delete_repository_creation_template(&request)
301            }
302            "DescribeRepositoryCreationTemplates" => {
303                self.describe_repository_creation_templates(&request)
304            }
305            "UpdateRepositoryCreationTemplate" => {
306                self.update_repository_creation_template(&request)
307            }
308            "GetSigningConfiguration" => self.get_signing_configuration(&request),
309            "PutSigningConfiguration" => self.put_signing_configuration(&request),
310            "DeleteSigningConfiguration" => self.delete_signing_configuration(&request),
311            "DescribeImageSigningStatus" => self.describe_image_signing_status(&request),
312            "RegisterPullTimeUpdateExclusion" => self.register_pull_time_update_exclusion(&request),
313            "DeregisterPullTimeUpdateExclusion" => {
314                self.deregister_pull_time_update_exclusion(&request)
315            }
316            "ListPullTimeUpdateExclusions" => self.list_pull_time_update_exclusions(&request),
317            "ListImageReferrers" => self.list_image_referrers(&request),
318            "UpdateImageStorageClass" => self.update_image_storage_class(&request),
319            _ => Err(AwsServiceError::action_not_implemented(
320                self.service_name(),
321                &request.action,
322            )),
323        };
324        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
325            self.save_snapshot().await;
326        }
327        result
328    }
329
330    fn supported_actions(&self) -> &[&str] {
331        SUPPORTED_ACTIONS
332    }
333}
334
335// -------- helpers --------
336
337// -------- operations --------
338
339impl EcrService {}
340
341// -------- image + layer helpers --------
342
343// -------- image + layer operations --------
344
345impl EcrService {
346    /// Mark `digest` as `IN_PROGRESS` and spawn the scanner task.
347    /// Identical wiring to `start_image_scan` minus the request-shaped
348    /// response — used both by the user-facing `StartImageScan` and the
349    /// `scan_on_push=true` PutImage hook.
350    fn trigger_scan(&self, account: &str, name: &str, digest: &str) {
351        use crate::state::ImageScanFindings;
352        let layers = {
353            let mut accounts = self.state.write();
354            let Some(state) = accounts.get_mut(account) else {
355                return;
356            };
357            let Some(repo) = state.repositories.get_mut(name) else {
358                return;
359            };
360            repo.scan_findings.insert(
361                digest.to_string(),
362                ImageScanFindings {
363                    image_digest: digest.to_string(),
364                    scan_status: "IN_PROGRESS".to_string(),
365                    scan_completed_at: None,
366                    vulnerability_source_updated_at: None,
367                    finding_severity_counts: BTreeMap::new(),
368                    findings: Vec::new(),
369                },
370            );
371            layers_for_image(repo, digest)
372        };
373        let shared = self.state.clone();
374        let store = self.snapshot_store.clone();
375        let snap_lock = self.snapshot_lock.clone();
376        let account = account.to_string();
377        let name = name.to_string();
378        let digest = digest.to_string();
379        tokio::spawn(async move {
380            let result = crate::scanner::scan_layers(&digest, &layers).await;
381            {
382                let mut accounts = shared.write();
383                let Some(state) = accounts.get_mut(&account) else {
384                    return;
385                };
386                let Some(repo) = state.repositories.get_mut(&name) else {
387                    return;
388                };
389                let findings = result.unwrap_or_else(|| ImageScanFindings {
390                    image_digest: digest.clone(),
391                    scan_status: "COMPLETE".to_string(),
392                    scan_completed_at: Some(Utc::now()),
393                    vulnerability_source_updated_at: Some(Utc::now()),
394                    finding_severity_counts: BTreeMap::new(),
395                    findings: Vec::new(),
396                });
397                repo.scan_findings.insert(digest.clone(), findings);
398            }
399            EcrService::save_snapshot_with(shared, store, snap_lock).await;
400        });
401    }
402}
403
404// -------- lifecycle + scan + registry + polish handlers (Batch 4) --------
405
406impl EcrService {
407    fn put_replication_configuration(
408        &self,
409        request: &AwsRequest,
410    ) -> Result<AwsResponse, AwsServiceError> {
411        use crate::state::{
412            ReplicationConfiguration, ReplicationDestination, ReplicationRule, RepositoryFilter,
413        };
414        let body = request.json_body();
415        let cfg_value = body
416            .get("replicationConfiguration")
417            .cloned()
418            .ok_or_else(|| invalid_parameter("Missing replicationConfiguration"))?;
419        let rules_value = cfg_value
420            .get("rules")
421            .and_then(|v| v.as_array())
422            .cloned()
423            .unwrap_or_default();
424        let rules: Vec<ReplicationRule> = rules_value
425            .iter()
426            .map(|r| ReplicationRule {
427                destinations: r
428                    .get("destinations")
429                    .and_then(|v| v.as_array())
430                    .map(|arr| {
431                        arr.iter()
432                            .map(|d| ReplicationDestination {
433                                region: d
434                                    .get("region")
435                                    .and_then(|v| v.as_str())
436                                    .unwrap_or("")
437                                    .to_string(),
438                                registry_id: d
439                                    .get("registryId")
440                                    .and_then(|v| v.as_str())
441                                    .unwrap_or("")
442                                    .to_string(),
443                            })
444                            .collect()
445                    })
446                    .unwrap_or_default(),
447                repository_filters: r
448                    .get("repositoryFilters")
449                    .and_then(|v| v.as_array())
450                    .map(|arr| {
451                        arr.iter()
452                            .map(|f| RepositoryFilter {
453                                filter: f
454                                    .get("filter")
455                                    .and_then(|v| v.as_str())
456                                    .unwrap_or("")
457                                    .to_string(),
458                                filter_type: f
459                                    .get("filterType")
460                                    .and_then(|v| v.as_str())
461                                    .unwrap_or("PREFIX_MATCH")
462                                    .to_string(),
463                            })
464                            .collect()
465                    })
466                    .unwrap_or_default(),
467            })
468            .collect();
469        let account = target_account_id(request, &body);
470        let mut accounts = self.state.write();
471        let state = accounts.get_or_create(&account);
472        state.replication_configuration = Some(ReplicationConfiguration { rules });
473        let cfg = state.replication_configuration.clone().unwrap();
474        Ok(AwsResponse::ok_json(json!({
475            "replicationConfiguration": {
476                "rules": cfg.rules.iter().map(|r| json!({
477                    "destinations": r.destinations.iter().map(|d| json!({
478                        "region": d.region,
479                        "registryId": d.registry_id,
480                    })).collect::<Vec<_>>(),
481                    "repositoryFilters": r.repository_filters.iter().map(|f| json!({
482                        "filter": f.filter,
483                        "filterType": f.filter_type,
484                    })).collect::<Vec<_>>(),
485                })).collect::<Vec<_>>(),
486            },
487        })))
488    }
489}
490
491#[path = "../service_helpers.rs"]
492mod service_helpers;
493pub use service_helpers::evaluate_lifecycle_policy;
494pub(crate) use service_helpers::*;
495
496#[cfg(test)]
497#[path = "../service_tests.rs"]
498mod tests;