Skip to main content

greentic_distributor_client/
dist.rs

1use crate::oci_components::{ComponentResolveOptions, DefaultRegistryClient, OciComponentResolver};
2use crate::store_auth::{
3    StoreCredentials, default_store_auth_path, default_store_state_path, load_login,
4};
5use async_trait::async_trait;
6use oci_distribution::Reference;
7use reqwest::Url;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::cell::OnceCell;
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{SystemTime, UNIX_EPOCH};
16use thiserror::Error;
17
18const WASM_CONTENT_TYPE: &str = "application/wasm";
19static LAST_USED_COUNTER: AtomicU64 = AtomicU64::new(1);
20
21const CACHE_FORMAT_VERSION: u32 = 1;
22
23#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
24pub enum ArtifactSourceKind {
25    Oci,
26    Https,
27    File,
28    Fixture,
29    Repo,
30    Store,
31    CacheDigest,
32}
33
34#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
35pub struct TransportHints {
36    pub offline: bool,
37    pub allow_insecure_local_http: bool,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
41pub struct ArtifactSource {
42    pub raw_ref: String,
43    pub kind: ArtifactSourceKind,
44    pub transport_hints: TransportHints,
45    pub dev_mode: bool,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ArtifactType {
50    Bundle,
51    Pack,
52    Component,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
56pub enum ResolvedVia {
57    Direct,
58    TagResolution,
59    RepoMapping,
60    StoreMapping,
61    Fixture,
62    File,
63    Https,
64    CacheDigest,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct ArtifactDescriptor {
69    pub artifact_type: ArtifactType,
70    pub source_kind: ArtifactSourceKind,
71    pub raw_ref: String,
72    pub canonical_ref: String,
73    pub digest: String,
74    pub media_type: String,
75    pub size_bytes: u64,
76    pub created_at: Option<u64>,
77    pub annotations: serde_json::Map<String, serde_json::Value>,
78    pub manifest_digest: Option<String>,
79    pub resolved_via: ResolvedVia,
80    pub signature_refs: Vec<String>,
81    pub sbom_refs: Vec<String>,
82}
83
84impl ArtifactDescriptor {
85    pub fn cache_key(&self) -> String {
86        self.digest.clone()
87    }
88}
89
90#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
91pub enum IntegrityState {
92    Partial,
93    Ready,
94    Corrupt,
95    Evicted,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99pub struct SourceSnapshot {
100    pub raw_ref: String,
101    pub canonical_ref: String,
102    pub source_kind: ArtifactSourceKind,
103    pub authoritative: bool,
104}
105
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub enum CacheEntryState {
108    Partial,
109    Ready,
110    Corrupt,
111    Evicted,
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115pub struct CacheEntry {
116    pub format_version: u32,
117    pub cache_key: String,
118    pub digest: String,
119    pub media_type: String,
120    pub size_bytes: u64,
121    pub artifact_type: ArtifactType,
122    pub source_kind: ArtifactSourceKind,
123    pub raw_ref: String,
124    pub canonical_ref: String,
125    pub fetched_at: u64,
126    pub last_accessed_at: u64,
127    pub last_verified_at: Option<u64>,
128    pub state: CacheEntryState,
129    pub advisory_epoch: Option<u64>,
130    pub signature_summary: Option<serde_json::Value>,
131    pub local_path: PathBuf,
132    pub source_snapshot: SourceSnapshot,
133}
134
135#[derive(Clone, Debug, Default, PartialEq, Eq)]
136pub struct ResolvePolicy;
137
138#[derive(Clone, Debug, Default, PartialEq, Eq)]
139pub struct CachePolicy;
140
141#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
142pub struct RetentionReport {
143    pub scanned_entries: usize,
144    pub kept: usize,
145    pub evicted: usize,
146    pub protected: usize,
147    pub bytes_reclaimed: u64,
148    pub refusals: Vec<String>,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
152pub enum RetentionEnvironment {
153    Dev,
154    Staging,
155    Prod,
156}
157
158#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
159pub struct RetentionInput {
160    pub entries: Vec<CacheEntry>,
161    pub active_bundle_ids: Vec<String>,
162    pub staged_bundle_ids: Vec<String>,
163    pub warming_bundle_ids: Vec<String>,
164    pub ready_bundle_ids: Vec<String>,
165    pub draining_bundle_ids: Vec<String>,
166    pub session_referenced_bundle_ids: Vec<String>,
167    pub max_cache_bytes: u64,
168    pub max_entry_age: Option<u64>,
169    pub minimum_rollback_depth: usize,
170    pub environment: RetentionEnvironment,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub enum RetentionDisposition {
175    Keep,
176    Evict,
177    Protect,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
181pub struct RetentionDecision {
182    pub cache_key: String,
183    pub bundle_id: String,
184    pub decision: RetentionDisposition,
185    pub reason_code: String,
186    pub reason_detail: String,
187}
188
189#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
190pub struct RetentionOutcome {
191    pub decisions: Vec<RetentionDecision>,
192    pub report: RetentionReport,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub enum VerificationEnvironment {
197    Dev,
198    Staging,
199    Prod,
200}
201
202#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
203pub struct VerificationPolicy {
204    pub require_signature: bool,
205    pub trusted_issuers: Vec<String>,
206    pub deny_issuers: Vec<String>,
207    pub deny_digests: Vec<String>,
208    pub allowed_media_types: Vec<String>,
209    pub require_sbom: bool,
210    pub minimum_operator_version: Option<String>,
211    pub environment: VerificationEnvironment,
212}
213
214impl Default for VerificationPolicy {
215    fn default() -> Self {
216        Self {
217            require_signature: false,
218            trusted_issuers: Vec::new(),
219            deny_issuers: Vec::new(),
220            deny_digests: Vec::new(),
221            allowed_media_types: Vec::new(),
222            require_sbom: false,
223            minimum_operator_version: None,
224            environment: VerificationEnvironment::Dev,
225        }
226    }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
230pub struct AdvisorySet {
231    pub version: String,
232    pub issued_at: u64,
233    pub source: String,
234    pub deny_digests: Vec<String>,
235    pub deny_issuers: Vec<String>,
236    pub minimum_operator_version: Option<String>,
237    pub release_train: Option<ReleaseTrainDescriptor>,
238    pub expires_at: Option<u64>,
239    pub next_refresh_hint: Option<u64>,
240}
241
242#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
243pub struct ReleaseTrainDescriptor {
244    pub train_id: String,
245    pub operator_digest: Option<String>,
246    pub bundle_digests: Vec<String>,
247    pub required_extension_digests: Vec<String>,
248    pub baseline_observer_digest: Option<String>,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
252pub enum VerificationOutcome {
253    Passed,
254    Failed,
255    Warning,
256    Skipped,
257}
258
259#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
260pub struct VerificationCheck {
261    pub name: String,
262    pub outcome: VerificationOutcome,
263    pub detail: String,
264    pub payload: Option<serde_json::Value>,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
268pub struct VerificationReport {
269    pub artifact_digest: String,
270    pub canonical_ref: String,
271    pub checks: Vec<VerificationCheck>,
272    pub passed: bool,
273    pub warnings: Vec<String>,
274    pub errors: Vec<String>,
275    pub policy_fingerprint: String,
276    pub advisory_version: Option<String>,
277    pub cache_entry_fingerprint: Option<String>,
278}
279
280#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
281pub struct PreliminaryDecision {
282    pub passed: bool,
283    pub checks: Vec<VerificationCheck>,
284    pub warnings: Vec<String>,
285    pub errors: Vec<String>,
286}
287
288#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
289pub enum AccessMode {
290    Userspace,
291    Mount,
292}
293
294#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
295pub enum BundleOpenMode {
296    CacheReuse,
297    Userspace,
298    Mount,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
302pub struct StageBundleInput {
303    pub bundle_ref: String,
304    pub requested_access_mode: AccessMode,
305    pub verification_policy_ref: String,
306    pub cache_policy_ref: String,
307    pub tenant: Option<String>,
308    pub team: Option<String>,
309}
310
311#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
312pub struct WarmBundleInput {
313    pub bundle_id: String,
314    pub cache_key: String,
315    pub smoke_test: bool,
316    pub dry_run: bool,
317    pub expected_operator_version: Option<String>,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
321pub struct RollbackBundleInput {
322    pub target_bundle_id: String,
323    pub expected_cache_key: Option<String>,
324}
325
326#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
327pub struct BundleManifestSummary {
328    pub component_id: String,
329    pub abi_version: Option<String>,
330    pub describe_artifact_ref: Option<String>,
331    pub artifact_type: ArtifactType,
332    pub media_type: String,
333    pub size_bytes: u64,
334}
335
336#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
337pub struct StageAuditFields {
338    pub staged_at: u64,
339    pub requested_access_mode: AccessMode,
340    pub verification_policy_ref: String,
341    pub cache_policy_ref: String,
342    pub tenant: Option<String>,
343    pub team: Option<String>,
344}
345
346#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
347pub struct WarmAuditFields {
348    pub warmed_at: u64,
349    pub smoke_test: bool,
350    pub dry_run: bool,
351    pub reopened_from_cache: bool,
352}
353
354#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
355pub struct RollbackAuditFields {
356    pub rolled_back_at: u64,
357    pub reopened_from_cache: bool,
358    pub expected_cache_key: Option<String>,
359}
360
361#[derive(Debug)]
362pub struct StageBundleResult {
363    pub bundle_id: String,
364    pub canonical_ref: String,
365    pub descriptor: ArtifactDescriptor,
366    pub resolved_artifact: ResolvedArtifact,
367    pub verification_report: VerificationReport,
368    pub cache_entry: CacheEntry,
369    pub stage_audit_fields: StageAuditFields,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
373pub struct WarmBundleResult {
374    pub bundle_id: String,
375    pub verification_report: VerificationReport,
376    pub bundle_manifest_summary: BundleManifestSummary,
377    pub bundle_open_mode: BundleOpenMode,
378    pub warnings: Vec<String>,
379    pub errors: Vec<String>,
380    pub warm_audit_fields: WarmAuditFields,
381}
382
383#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
384pub struct RollbackBundleResult {
385    pub bundle_id: String,
386    pub reopened_from_cache: bool,
387    pub cache_entry: CacheEntry,
388    pub verification_report: VerificationReport,
389    pub rollback_audit_fields: RollbackAuditFields,
390}
391
392#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
393pub struct BundleRecord {
394    pub bundle_id: String,
395    pub cache_key: String,
396    pub canonical_ref: String,
397    pub source_kind: ArtifactSourceKind,
398    pub fetched_at: u64,
399    pub lifecycle_state: BundleLifecycleState,
400}
401
402#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
403pub enum BundleLifecycleState {
404    Inactive,
405    Staged,
406    Warming,
407    Ready,
408    Draining,
409}
410
411#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
412pub struct ArtifactOpenRequest {
413    pub bundle_id: String,
414    pub dry_run: bool,
415    pub smoke_test: bool,
416}
417
418#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
419pub struct ArtifactOpenOutput {
420    pub bundle_manifest_summary: BundleManifestSummary,
421    pub bundle_open_mode: BundleOpenMode,
422    pub warnings: Vec<String>,
423}
424
425#[deprecated(note = "use ArtifactOpenRequest instead")]
426pub type BundleOpenRequest = ArtifactOpenRequest;
427
428#[deprecated(note = "use ArtifactOpenOutput instead")]
429pub type BundleOpenOutput = ArtifactOpenOutput;
430
431#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
432pub enum IntegrationErrorCode {
433    InvalidReference,
434    UnsupportedSource,
435    ResolutionFailed,
436    DownloadFailed,
437    ResolutionUnavailable,
438    DigestMismatch,
439    MediaTypeRejected,
440    IssuerRejected,
441    DigestDenied,
442    SignatureRequired,
443    CacheCorrupt,
444    CacheMiss,
445    OfflineRequiredButUnavailable,
446    UnsupportedArtifactType,
447    DescriptorCorrupt,
448    PolicyInputInvalid,
449    AdvisoryRejected,
450    VerificationFailed,
451    BundleOpenFailed,
452}
453
454#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
455pub struct IntegrationError {
456    pub code: IntegrationErrorCode,
457    pub summary: String,
458    pub retryable: bool,
459    pub details: Option<serde_json::Value>,
460}
461
462pub trait ArtifactOpener: Send + Sync {
463    fn open(
464        &self,
465        artifact: &ResolvedArtifact,
466        request: &ArtifactOpenRequest,
467    ) -> Result<ArtifactOpenOutput, IntegrationError>;
468}
469
470#[deprecated(
471    note = "use ArtifactOpener instead; format ownership belongs outside distributor-client"
472)]
473pub trait BundleOpener: Send + Sync {
474    fn open(
475        &self,
476        artifact: &ResolvedArtifact,
477        request: &ArtifactOpenRequest,
478    ) -> Result<ArtifactOpenOutput, IntegrationError>;
479}
480
481#[allow(deprecated)]
482impl<T: BundleOpener + ?Sized> ArtifactOpener for T {
483    fn open(
484        &self,
485        artifact: &ResolvedArtifact,
486        request: &ArtifactOpenRequest,
487    ) -> Result<ArtifactOpenOutput, IntegrationError> {
488        BundleOpener::open(self, artifact, request)
489    }
490}
491
492#[derive(Clone, Debug)]
493pub struct DistOptions {
494    pub cache_dir: PathBuf,
495    pub allow_tags: bool,
496    pub offline: bool,
497    pub allow_insecure_local_http: bool,
498    pub cache_max_bytes: u64,
499    pub repo_registry_base: Option<String>,
500    pub store_registry_base: Option<String>,
501    pub store_auth_path: PathBuf,
502    pub store_state_path: PathBuf,
503    #[cfg(feature = "fixture-resolver")]
504    pub fixture_dir: Option<PathBuf>,
505}
506
507impl Default for DistOptions {
508    fn default() -> Self {
509        let offline = std::env::var("GREENTIC_DIST_OFFLINE").is_ok_and(|v| v == "1");
510        let allow_insecure_local_http =
511            std::env::var("GREENTIC_DIST_ALLOW_INSECURE_LOCAL_HTTP").is_ok_and(|v| v == "1");
512        let cache_max_bytes = std::env::var("GREENTIC_CACHE_MAX_BYTES")
513            .ok()
514            .and_then(|v| v.parse::<u64>().ok())
515            .unwrap_or(3 * 1024 * 1024 * 1024);
516        let cache_dir = std::env::var("GREENTIC_CACHE_DIR")
517            .or_else(|_| std::env::var("GREENTIC_DIST_CACHE_DIR"))
518            .map(PathBuf::from)
519            .unwrap_or_else(|_| default_distribution_cache_root());
520        Self {
521            cache_dir,
522            allow_tags: true,
523            offline,
524            allow_insecure_local_http,
525            cache_max_bytes,
526            repo_registry_base: std::env::var("GREENTIC_REPO_REGISTRY_BASE").ok(),
527            store_registry_base: std::env::var("GREENTIC_STORE_REGISTRY_BASE").ok(),
528            store_auth_path: default_store_auth_path(),
529            store_state_path: default_store_state_path(),
530            #[cfg(feature = "fixture-resolver")]
531            fixture_dir: std::env::var("GREENTIC_FIXTURE_DIR")
532                .ok()
533                .map(PathBuf::from),
534        }
535    }
536}
537
538#[derive(Debug)]
539pub struct ResolvedArtifact {
540    pub descriptor: ArtifactDescriptor,
541    pub cache_key: String,
542    pub local_path: PathBuf,
543    pub fetched_at: u64,
544    pub integrity_state: IntegrityState,
545    pub source_snapshot: SourceSnapshot,
546    pub resolved_digest: String,
547    pub wasm_bytes: Option<Vec<u8>>,
548    pub wasm_path: Option<PathBuf>,
549    pub component_id: String,
550    pub abi_version: Option<String>,
551    pub describe_artifact_ref: Option<String>,
552    pub content_length: Option<u64>,
553    pub content_type: Option<String>,
554    pub fetched: bool,
555    pub source: ArtifactSource,
556    pub digest: String,
557    pub cache_path: Option<PathBuf>,
558    loaded_wasm_bytes: OnceCell<Vec<u8>>,
559}
560
561impl ResolvedArtifact {
562    #[allow(clippy::too_many_arguments)]
563    fn from_path(
564        resolved_digest: String,
565        wasm_path: PathBuf,
566        component_id: String,
567        abi_version: Option<String>,
568        describe_artifact_ref: Option<String>,
569        content_length: Option<u64>,
570        content_type: Option<String>,
571        fetched: bool,
572        source: LegacyArtifactSource,
573    ) -> Self {
574        let source_kind = source.kind();
575        let canonical_ref = source.canonical_ref(&resolved_digest);
576        let fetched_at = unix_now();
577        let public_source = artifact_source_from_legacy(&source);
578        let descriptor = ArtifactDescriptor {
579            artifact_type: ArtifactType::Component,
580            source_kind: source_kind.clone(),
581            raw_ref: public_source.raw_ref.clone(),
582            canonical_ref: canonical_ref.clone(),
583            digest: resolved_digest.clone(),
584            media_type: content_type
585                .clone()
586                .unwrap_or_else(|| WASM_CONTENT_TYPE.to_string()),
587            size_bytes: content_length.unwrap_or_default(),
588            created_at: None,
589            annotations: serde_json::Map::new(),
590            manifest_digest: None,
591            resolved_via: source.resolved_via(),
592            signature_refs: Vec::new(),
593            sbom_refs: Vec::new(),
594        };
595        let source_snapshot = SourceSnapshot {
596            raw_ref: descriptor.raw_ref.clone(),
597            canonical_ref: descriptor.canonical_ref.clone(),
598            source_kind: descriptor.source_kind.clone(),
599            authoritative: descriptor.raw_ref == descriptor.canonical_ref,
600        };
601        Self {
602            descriptor,
603            cache_key: resolved_digest.clone(),
604            local_path: wasm_path.clone(),
605            fetched_at,
606            integrity_state: IntegrityState::Ready,
607            source_snapshot,
608            digest: resolved_digest.clone(),
609            cache_path: Some(wasm_path.clone()),
610            resolved_digest,
611            wasm_bytes: None,
612            wasm_path: Some(wasm_path),
613            component_id,
614            abi_version,
615            describe_artifact_ref,
616            content_length,
617            content_type,
618            fetched,
619            source: public_source,
620            loaded_wasm_bytes: OnceCell::new(),
621        }
622    }
623
624    pub fn validate_payload(&self) -> Result<(), DistError> {
625        let has_bytes = self.wasm_bytes.is_some();
626        let has_path = self.wasm_path.is_some();
627        if has_bytes == has_path {
628            return Err(DistError::CorruptArtifact {
629                reference: self.resolved_digest.clone(),
630                reason: "expected exactly one of wasm_bytes or wasm_path".into(),
631            });
632        }
633        Ok(())
634    }
635
636    pub fn wasm_bytes(&self) -> Result<&[u8], DistError> {
637        self.validate_payload()?;
638        if let Some(bytes) = self.wasm_bytes.as_deref() {
639            return Ok(bytes);
640        }
641        let path = self
642            .wasm_path
643            .as_ref()
644            .ok_or_else(|| DistError::CorruptArtifact {
645                reference: self.resolved_digest.clone(),
646                reason: "missing wasm path".into(),
647            })?;
648        if self.loaded_wasm_bytes.get().is_none() {
649            let loaded = fs::read(path).map_err(|source| DistError::CacheError {
650                path: path.display().to_string(),
651                source,
652            })?;
653            let _ = self.loaded_wasm_bytes.set(loaded);
654        }
655        Ok(self
656            .loaded_wasm_bytes
657            .get()
658            .expect("loaded_wasm_bytes must be set")
659            .as_slice())
660    }
661
662    pub fn lock_hint(&self, source_ref: impl Into<String>) -> LockHint {
663        LockHint {
664            source_ref: source_ref.into(),
665            resolved_digest: self.resolved_digest.clone(),
666            content_length: self.content_length,
667            content_type: self.content_type.clone(),
668            abi_version: self.abi_version.clone(),
669            component_id: self.component_id.clone(),
670        }
671    }
672}
673
674#[derive(Clone, Debug, PartialEq, Eq)]
675pub struct LockHint {
676    pub source_ref: String,
677    pub resolved_digest: String,
678    pub content_length: Option<u64>,
679    pub content_type: Option<String>,
680    pub abi_version: Option<String>,
681    pub component_id: String,
682}
683
684#[derive(Clone, Debug)]
685enum LegacyArtifactSource {
686    Digest,
687    Http(String),
688    File(PathBuf),
689    Oci(String),
690    Repo(String),
691    Store(String),
692}
693
694impl LegacyArtifactSource {
695    fn kind(&self) -> ArtifactSourceKind {
696        match self {
697            Self::Digest => ArtifactSourceKind::CacheDigest,
698            Self::Http(_) => ArtifactSourceKind::Https,
699            Self::File(_) => ArtifactSourceKind::File,
700            Self::Oci(_) => ArtifactSourceKind::Oci,
701            Self::Repo(_) => ArtifactSourceKind::Repo,
702            Self::Store(_) => ArtifactSourceKind::Store,
703        }
704    }
705
706    fn raw_ref(&self) -> String {
707        match self {
708            Self::Digest => String::new(),
709            Self::Http(url) => url.clone(),
710            Self::File(path) => path.display().to_string(),
711            Self::Oci(reference) => format!("oci://{reference}"),
712            Self::Repo(reference) => reference.clone(),
713            Self::Store(reference) => reference.clone(),
714        }
715    }
716
717    fn canonical_ref(&self, digest: &str) -> String {
718        match self {
719            Self::Digest => digest.to_string(),
720            Self::Http(url) => format!("{url}@{digest}"),
721            Self::File(path) => format!("file://{}@{digest}", path.display()),
722            Self::Oci(reference) => canonical_oci_ref(reference, digest),
723            Self::Repo(reference) | Self::Store(reference) => {
724                let raw = reference
725                    .trim_start_matches("repo://")
726                    .trim_start_matches("store://");
727                canonical_oci_ref(raw, digest)
728            }
729        }
730    }
731
732    fn resolved_via(&self) -> ResolvedVia {
733        match self {
734            Self::Digest => ResolvedVia::CacheDigest,
735            Self::Http(_) => ResolvedVia::Https,
736            Self::File(_) => ResolvedVia::File,
737            Self::Oci(reference) => {
738                if reference.contains(':') && !reference.contains("@sha256:") {
739                    ResolvedVia::TagResolution
740                } else {
741                    ResolvedVia::Direct
742                }
743            }
744            Self::Repo(_) => ResolvedVia::RepoMapping,
745            Self::Store(_) => ResolvedVia::StoreMapping,
746        }
747    }
748}
749
750fn artifact_source_from_legacy(source: &LegacyArtifactSource) -> ArtifactSource {
751    ArtifactSource {
752        raw_ref: source.raw_ref(),
753        kind: source.kind(),
754        transport_hints: TransportHints::default(),
755        dev_mode: matches!(
756            source.kind(),
757            ArtifactSourceKind::Fixture | ArtifactSourceKind::File
758        ),
759    }
760}
761
762fn legacy_source_from_public(source: &ArtifactSource) -> LegacyArtifactSource {
763    match source.kind {
764        ArtifactSourceKind::CacheDigest => LegacyArtifactSource::Digest,
765        ArtifactSourceKind::Https => LegacyArtifactSource::Http(source.raw_ref.clone()),
766        ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
767            LegacyArtifactSource::File(PathBuf::from(source.raw_ref.clone()))
768        }
769        ArtifactSourceKind::Oci => {
770            LegacyArtifactSource::Oci(source.raw_ref.trim_start_matches("oci://").to_string())
771        }
772        ArtifactSourceKind::Repo => LegacyArtifactSource::Repo(source.raw_ref.clone()),
773        ArtifactSourceKind::Store => LegacyArtifactSource::Store(source.raw_ref.clone()),
774    }
775}
776
777pub struct DistClient {
778    cache: ComponentCache,
779    oci: OciComponentResolver<DefaultRegistryClient>,
780    http: reqwest::Client,
781    opts: DistOptions,
782    injected: Option<Arc<dyn ResolveRefInjector>>,
783    artifact_opener: Arc<dyn ArtifactOpener>,
784}
785
786#[derive(Clone, Debug, Default)]
787struct DefaultArtifactOpener;
788
789#[derive(Clone, Debug)]
790pub struct OciCacheInspection {
791    pub digest: String,
792    pub cache_dir: PathBuf,
793    pub selected_media_type: String,
794    pub fetched: bool,
795}
796
797#[derive(Clone, Debug)]
798#[deprecated(note = "use ArtifactSource with DistClient::resolve/fetch instead")]
799pub struct ResolveRefRequest {
800    pub reference: String,
801}
802
803#[derive(Clone, Debug, Default)]
804#[deprecated(note = "use ArtifactSource with DistClient::resolve/fetch instead")]
805pub struct ResolveComponentRequest {
806    pub reference: String,
807    pub tenant: Option<String>,
808    pub pack: Option<String>,
809    pub environment: Option<String>,
810}
811
812#[derive(Clone, Debug)]
813pub enum InjectedResolution {
814    Redirect(String),
815    WasmBytes {
816        resolved_digest: String,
817        wasm_bytes: Vec<u8>,
818        component_id: String,
819        abi_version: Option<String>,
820        source: ArtifactSource,
821    },
822    WasmPath {
823        resolved_digest: String,
824        wasm_path: PathBuf,
825        component_id: String,
826        abi_version: Option<String>,
827        source: ArtifactSource,
828    },
829}
830
831#[async_trait]
832pub trait ResolveRefInjector: Send + Sync {
833    async fn resolve(&self, reference: &str) -> Result<Option<InjectedResolution>, DistError>;
834}
835
836impl DistClient {
837    pub fn new(opts: DistOptions) -> Self {
838        Self::with_parts(opts, None, Arc::new(DefaultArtifactOpener))
839    }
840
841    pub fn with_ref_injector(opts: DistOptions, injector: Arc<dyn ResolveRefInjector>) -> Self {
842        Self::with_parts(opts, Some(injector), Arc::new(DefaultArtifactOpener))
843    }
844
845    pub fn with_artifact_opener(
846        opts: DistOptions,
847        artifact_opener: Arc<dyn ArtifactOpener>,
848    ) -> Self {
849        Self::with_parts(opts, None, artifact_opener)
850    }
851
852    #[allow(deprecated)]
853    #[deprecated(note = "use with_artifact_opener instead")]
854    pub fn with_bundle_opener<T: BundleOpener + 'static>(
855        opts: DistOptions,
856        bundle_opener: Arc<T>,
857    ) -> Self {
858        Self::with_artifact_opener(opts, bundle_opener)
859    }
860
861    fn with_parts(
862        opts: DistOptions,
863        injected: Option<Arc<dyn ResolveRefInjector>>,
864        artifact_opener: Arc<dyn ArtifactOpener>,
865    ) -> Self {
866        let oci_opts = ComponentResolveOptions {
867            allow_tags: opts.allow_tags,
868            offline: opts.offline,
869            cache_dir: opts.cache_dir.join("legacy-components"),
870            ..Default::default()
871        };
872        let http = reqwest::Client::builder()
873            .no_proxy()
874            .build()
875            .expect("failed to build http client");
876        Self {
877            cache: ComponentCache::new(opts.cache_dir.clone()),
878            oci: OciComponentResolver::new(oci_opts),
879            http,
880            opts,
881            injected,
882            artifact_opener,
883        }
884    }
885
886    async fn resolve_descriptor_from_reference(
887        &self,
888        reference: &str,
889    ) -> Result<ArtifactDescriptor, DistError> {
890        match classify_reference(reference)? {
891            RefKind::Digest(digest) => {
892                let entry = self.stat_cache(&digest)?;
893                Ok(descriptor_from_entry(&entry))
894            }
895            RefKind::Http(url) => {
896                if self.opts.offline {
897                    return Err(DistError::Offline { reference: url });
898                }
899                let normalized = ensure_secure_http_url(&url, self.opts.allow_insecure_local_http)?;
900                Ok(ArtifactDescriptor {
901                    artifact_type: ArtifactType::Component,
902                    source_kind: ArtifactSourceKind::Https,
903                    raw_ref: normalized.to_string(),
904                    canonical_ref: normalized.to_string(),
905                    digest: String::new(),
906                    media_type: WASM_CONTENT_TYPE.to_string(),
907                    size_bytes: 0,
908                    created_at: None,
909                    annotations: serde_json::Map::new(),
910                    manifest_digest: None,
911                    resolved_via: ResolvedVia::Https,
912                    signature_refs: Vec::new(),
913                    sbom_refs: Vec::new(),
914                })
915            }
916            RefKind::File(path) => {
917                let bytes = fs::read(&path).map_err(|source| DistError::CacheError {
918                    path: path.display().to_string(),
919                    source,
920                })?;
921                let digest = digest_for_bytes(&bytes);
922                Ok(ArtifactDescriptor {
923                    artifact_type: ArtifactType::Component,
924                    source_kind: ArtifactSourceKind::File,
925                    raw_ref: path.display().to_string(),
926                    canonical_ref: format!("file://{}@{}", path.display(), digest),
927                    digest,
928                    media_type: WASM_CONTENT_TYPE.to_string(),
929                    size_bytes: bytes.len() as u64,
930                    created_at: None,
931                    annotations: serde_json::Map::new(),
932                    manifest_digest: None,
933                    resolved_via: ResolvedVia::File,
934                    signature_refs: Vec::new(),
935                    sbom_refs: Vec::new(),
936                })
937            }
938            RefKind::Oci(reference) => self.resolve_oci_descriptor(&reference).await,
939            RefKind::Repo(target) => {
940                if self.opts.repo_registry_base.is_none() {
941                    return Err(DistError::ResolutionUnavailable {
942                        reference: format!("repo://{target}"),
943                    });
944                }
945                let mapped = map_registry_target(&target, self.opts.repo_registry_base.as_deref())
946                    .ok_or_else(|| DistError::ResolutionUnavailable {
947                        reference: format!("repo://{target}"),
948                    })?;
949                let mut descriptor = self.resolve_oci_descriptor(&mapped).await?;
950                descriptor.source_kind = ArtifactSourceKind::Repo;
951                descriptor.raw_ref = format!("repo://{target}");
952                descriptor.resolved_via = ResolvedVia::RepoMapping;
953                Ok(descriptor)
954            }
955            RefKind::Store(target) => {
956                if is_greentic_biz_store_target(&target) {
957                    return self.resolve_greentic_biz_store_descriptor(&target).await;
958                }
959                if self.opts.store_registry_base.is_none() {
960                    return Err(DistError::ResolutionUnavailable {
961                        reference: format!("store://{target}"),
962                    });
963                }
964                let mapped = map_registry_target(&target, self.opts.store_registry_base.as_deref())
965                    .ok_or_else(|| DistError::ResolutionUnavailable {
966                        reference: format!("store://{target}"),
967                    })?;
968                let mut descriptor = self.resolve_oci_descriptor(&mapped).await?;
969                descriptor.source_kind = ArtifactSourceKind::Store;
970                descriptor.raw_ref = format!("store://{target}");
971                descriptor.resolved_via = ResolvedVia::StoreMapping;
972                Ok(descriptor)
973            }
974            #[cfg(feature = "fixture-resolver")]
975            RefKind::Fixture(target) => {
976                let fixture_dir = self.opts.fixture_dir.as_ref().ok_or_else(|| {
977                    DistError::InvalidInput("fixture:// requires fixture_dir".into())
978                })?;
979                let raw = target.trim_start_matches('/');
980                let candidate = if raw.ends_with(".wasm") {
981                    fixture_dir.join(raw)
982                } else {
983                    fixture_dir.join(format!("{raw}.wasm"))
984                };
985                let bytes = fs::read(&candidate).map_err(|source| DistError::CacheError {
986                    path: candidate.display().to_string(),
987                    source,
988                })?;
989                let digest = digest_for_bytes(&bytes);
990                Ok(ArtifactDescriptor {
991                    artifact_type: ArtifactType::Component,
992                    source_kind: ArtifactSourceKind::Fixture,
993                    raw_ref: format!("fixture://{target}"),
994                    canonical_ref: format!("file://{}@{}", candidate.display(), digest),
995                    digest,
996                    media_type: WASM_CONTENT_TYPE.to_string(),
997                    size_bytes: bytes.len() as u64,
998                    created_at: None,
999                    annotations: serde_json::Map::new(),
1000                    manifest_digest: None,
1001                    resolved_via: ResolvedVia::Fixture,
1002                    signature_refs: Vec::new(),
1003                    sbom_refs: Vec::new(),
1004                })
1005            }
1006        }
1007    }
1008
1009    async fn resolve_oci_descriptor(
1010        &self,
1011        reference: &str,
1012    ) -> Result<ArtifactDescriptor, DistError> {
1013        self.resolve_oci_descriptor_with_client(reference, DefaultRegistryClient::default())
1014            .await
1015    }
1016
1017    async fn resolve_oci_descriptor_with_client(
1018        &self,
1019        reference: &str,
1020        client: DefaultRegistryClient,
1021    ) -> Result<ArtifactDescriptor, DistError> {
1022        let resolver = OciComponentResolver::with_client(client, self.oci_resolve_options());
1023        let resolved = resolver
1024            .resolve_descriptor(reference)
1025            .await
1026            .map_err(DistError::Oci)?;
1027        Ok(ArtifactDescriptor {
1028            artifact_type: ArtifactType::Component,
1029            source_kind: ArtifactSourceKind::Oci,
1030            raw_ref: format!("oci://{reference}"),
1031            canonical_ref: canonical_oci_ref(reference, &resolved.resolved_digest),
1032            digest: resolved.resolved_digest,
1033            media_type: normalize_content_type(Some(&resolved.media_type), WASM_CONTENT_TYPE),
1034            size_bytes: resolved.size_bytes,
1035            created_at: None,
1036            annotations: serde_json::Map::new(),
1037            manifest_digest: resolved.manifest_digest,
1038            resolved_via: if reference.contains(':') && !reference.contains("@sha256:") {
1039                ResolvedVia::TagResolution
1040            } else {
1041                ResolvedVia::Direct
1042            },
1043            signature_refs: Vec::new(),
1044            sbom_refs: Vec::new(),
1045        })
1046    }
1047
1048    fn oci_resolve_options(&self) -> ComponentResolveOptions {
1049        ComponentResolveOptions {
1050            allow_tags: self.opts.allow_tags,
1051            offline: self.opts.offline,
1052            cache_dir: self.opts.cache_dir.join("legacy-components"),
1053            ..Default::default()
1054        }
1055    }
1056
1057    async fn load_store_credentials(&self, tenant: &str) -> Result<StoreCredentials, DistError> {
1058        load_login(
1059            &self.opts.store_auth_path,
1060            &self.opts.store_state_path,
1061            tenant,
1062        )
1063        .await
1064        .map_err(|err| DistError::StoreAuth(err.to_string()))
1065    }
1066
1067    async fn greentic_biz_store_client(
1068        &self,
1069        tenant: &str,
1070    ) -> Result<DefaultRegistryClient, DistError> {
1071        let credentials = self.load_store_credentials(tenant).await?;
1072        Ok(DefaultRegistryClient::with_basic_auth(
1073            credentials.username,
1074            credentials.token,
1075        ))
1076    }
1077
1078    async fn resolve_greentic_biz_store_descriptor(
1079        &self,
1080        target: &str,
1081    ) -> Result<ArtifactDescriptor, DistError> {
1082        let parsed = parse_greentic_biz_store_target(target)?;
1083        let client = self.greentic_biz_store_client(&parsed.tenant).await?;
1084        let mut descriptor = self
1085            .resolve_oci_descriptor_with_client(&parsed.mapped_reference, client)
1086            .await?;
1087        descriptor.source_kind = ArtifactSourceKind::Store;
1088        descriptor.raw_ref = format!("store://{target}");
1089        descriptor.resolved_via = ResolvedVia::StoreMapping;
1090        Ok(descriptor)
1091    }
1092
1093    async fn pull_oci_with_source_and_client(
1094        &self,
1095        reference: &str,
1096        source: LegacyArtifactSource,
1097        component_id: String,
1098        client: DefaultRegistryClient,
1099    ) -> Result<ResolvedArtifact, DistError> {
1100        if self.opts.offline {
1101            return Err(DistError::Offline {
1102                reference: reference.to_string(),
1103            });
1104        }
1105        let resolver = OciComponentResolver::with_client(client, self.oci_resolve_options());
1106        let result = resolver
1107            .resolve_refs(&crate::oci_components::ComponentsExtension {
1108                refs: vec![reference.to_string()],
1109                mode: crate::oci_components::ComponentsMode::Eager,
1110            })
1111            .await
1112            .map_err(DistError::Oci)?;
1113        let resolved = result
1114            .into_iter()
1115            .next()
1116            .ok_or_else(|| DistError::InvalidRef {
1117                reference: reference.to_string(),
1118            })?;
1119        let resolved_digest = resolved.resolved_digest.clone();
1120        let resolved_bytes = fs::read(&resolved.path).map_err(|source| DistError::CacheError {
1121            path: resolved.path.display().to_string(),
1122            source,
1123        })?;
1124        let resolved = ResolvedArtifact::from_path(
1125            resolved_digest.clone(),
1126            self.cache
1127                .write_component(&resolved_digest, &resolved_bytes)
1128                .map_err(|source| DistError::CacheError {
1129                    path: self
1130                        .cache
1131                        .component_path(&resolved_digest)
1132                        .display()
1133                        .to_string(),
1134                    source,
1135                })?,
1136            resolve_component_id_from_cache(&resolved.path, &component_id),
1137            resolve_abi_version_from_cache(&resolved.path),
1138            resolve_describe_artifact_ref_from_cache(&resolved.path),
1139            file_size_if_exists(&resolved.path),
1140            Some(normalize_content_type(
1141                Some(&resolved.media_type),
1142                WASM_CONTENT_TYPE,
1143            )),
1144            resolved.fetched_from_network,
1145            source,
1146        );
1147        self.persist_cache_entry(&resolved)?;
1148        self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
1149        resolved.validate_payload()?;
1150        Ok(resolved)
1151    }
1152
1153    fn persist_cache_entry(&self, artifact: &ResolvedArtifact) -> Result<(), DistError> {
1154        let local_path = artifact
1155            .cache_path
1156            .clone()
1157            .or_else(|| artifact.wasm_path.clone())
1158            .unwrap_or_else(|| artifact.local_path.clone());
1159        let entry = CacheEntry {
1160            format_version: CACHE_FORMAT_VERSION,
1161            cache_key: artifact.cache_key.clone(),
1162            digest: artifact.descriptor.digest.clone(),
1163            media_type: artifact.descriptor.media_type.clone(),
1164            size_bytes: artifact.descriptor.size_bytes,
1165            artifact_type: artifact.descriptor.artifact_type.clone(),
1166            source_kind: artifact.descriptor.source_kind.clone(),
1167            raw_ref: artifact.descriptor.raw_ref.clone(),
1168            canonical_ref: artifact.descriptor.canonical_ref.clone(),
1169            fetched_at: artifact.fetched_at,
1170            last_accessed_at: unix_now(),
1171            last_verified_at: None,
1172            state: cache_entry_state_from_integrity(&artifact.integrity_state),
1173            advisory_epoch: None,
1174            signature_summary: None,
1175            local_path,
1176            source_snapshot: artifact.source_snapshot.clone(),
1177        };
1178        self.cache
1179            .write_entry(&entry)
1180            .map_err(|source| DistError::CacheError {
1181                path: self.cache.entry_path(&entry.digest).display().to_string(),
1182                source,
1183            })
1184    }
1185
1186    fn persist_verification_report(
1187        &self,
1188        digest: &str,
1189        report: &VerificationReport,
1190    ) -> Result<(), DistError> {
1191        let mut entry = self.stat_cache(digest)?;
1192        entry.last_verified_at = Some(unix_now());
1193        entry.advisory_epoch = report
1194            .advisory_version
1195            .as_ref()
1196            .and_then(|raw| raw.parse::<u64>().ok());
1197        entry.signature_summary = report
1198            .checks
1199            .iter()
1200            .find(|check| check.name == "signature_verified")
1201            .map(|check| {
1202                serde_json::json!({
1203                    "outcome": verification_outcome_name(&check.outcome),
1204                    "detail": check.detail,
1205                })
1206            });
1207        self.cache
1208            .write_entry(&entry)
1209            .map_err(|source| DistError::CacheError {
1210                path: self.cache.entry_path(&entry.digest).display().to_string(),
1211                source,
1212            })
1213    }
1214
1215    pub async fn resolve(
1216        &self,
1217        source: ArtifactSource,
1218        _policy: ResolvePolicy,
1219    ) -> Result<ArtifactDescriptor, DistError> {
1220        let mut current = source.raw_ref.clone();
1221        for _ in 0..8 {
1222            if let Some(injected) = &self.injected
1223                && let Some(result) = injected.resolve(&current).await?
1224            {
1225                if let InjectedResolution::Redirect(next) = result {
1226                    current = next;
1227                    continue;
1228                }
1229                let artifact = self.materialize_injected(result)?;
1230                return Ok(artifact.descriptor);
1231            }
1232
1233            return self.resolve_descriptor_from_reference(&current).await;
1234        }
1235        Err(DistError::InvalidInput(
1236            "too many injected redirect hops".to_string(),
1237        ))
1238    }
1239
1240    pub fn parse_source(&self, reference: &str) -> Result<ArtifactSource, DistError> {
1241        artifact_source_from_reference(reference, &self.opts)
1242    }
1243
1244    pub fn load_advisory_set(
1245        &self,
1246        bytes: &[u8],
1247        source: impl Into<String>,
1248    ) -> Result<AdvisorySet, DistError> {
1249        let mut advisory: AdvisorySet = serde_json::from_slice(bytes)?;
1250        advisory.source = source.into();
1251        Ok(advisory)
1252    }
1253
1254    pub fn apply_policy(
1255        &self,
1256        descriptor: &ArtifactDescriptor,
1257        advisory_set: Option<&AdvisorySet>,
1258        verification_policy: &VerificationPolicy,
1259    ) -> PreliminaryDecision {
1260        let checks = vec![
1261            check_digest_allowed(&descriptor.digest, advisory_set, verification_policy),
1262            check_media_type_allowed(&descriptor.media_type, verification_policy),
1263            check_issuer_allowed(
1264                issuer_from_descriptor(descriptor),
1265                advisory_set,
1266                verification_policy,
1267            ),
1268            check_operator_version_compatible(descriptor, advisory_set, verification_policy),
1269        ];
1270
1271        preliminary_decision_from_checks(checks)
1272    }
1273
1274    pub fn verify_artifact(
1275        &self,
1276        resolved_artifact: &ResolvedArtifact,
1277        advisory_set: Option<&AdvisorySet>,
1278        verification_policy: &VerificationPolicy,
1279    ) -> Result<VerificationReport, DistError> {
1280        let mut checks = self
1281            .apply_policy(
1282                &resolved_artifact.descriptor,
1283                advisory_set,
1284                verification_policy,
1285            )
1286            .checks;
1287
1288        checks.push(check_content_digest_match(resolved_artifact)?);
1289        checks.push(check_signature_present(
1290            &resolved_artifact.descriptor,
1291            verification_policy,
1292        ));
1293        checks.push(check_signature_verified(
1294            &resolved_artifact.descriptor,
1295            verification_policy,
1296        ));
1297        checks.push(check_sbom_present(
1298            &resolved_artifact.descriptor,
1299            verification_policy,
1300        ));
1301
1302        let report = verification_report_from_checks(
1303            &resolved_artifact.descriptor,
1304            advisory_set,
1305            verification_policy,
1306            self.stat_cache(&resolved_artifact.descriptor.digest)
1307                .ok()
1308                .as_ref(),
1309            checks,
1310        );
1311
1312        self.persist_verification_report(&resolved_artifact.descriptor.digest, &report)?;
1313        Ok(report)
1314    }
1315
1316    pub async fn stage_bundle(
1317        &self,
1318        input: &StageBundleInput,
1319        advisory_set: Option<&AdvisorySet>,
1320        verification_policy: &VerificationPolicy,
1321        cache_policy: CachePolicy,
1322    ) -> Result<StageBundleResult, IntegrationError> {
1323        let source = self
1324            .parse_source(&input.bundle_ref)
1325            .map_err(IntegrationError::from_dist_error)?;
1326        let descriptor = self
1327            .resolve(source, ResolvePolicy)
1328            .await
1329            .map_err(IntegrationError::from_dist_error)?;
1330        let resolved_artifact = self
1331            .fetch(&descriptor, cache_policy)
1332            .await
1333            .map_err(IntegrationError::from_dist_error)?;
1334        let verification_report = self
1335            .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1336            .map_err(IntegrationError::from_dist_error)?;
1337        if !verification_report.passed {
1338            return Err(IntegrationError::from_verification_report(
1339                verification_report,
1340            ));
1341        }
1342        let cache_entry = self
1343            .stat_cache(&descriptor.digest)
1344            .map_err(IntegrationError::from_dist_error)?;
1345        let bundle_id = bundle_id_for_digest(&descriptor.digest);
1346        self.persist_bundle_record(&BundleRecord {
1347            bundle_id: bundle_id.clone(),
1348            cache_key: cache_entry.cache_key.clone(),
1349            canonical_ref: descriptor.canonical_ref.clone(),
1350            source_kind: descriptor.source_kind.clone(),
1351            fetched_at: cache_entry.fetched_at,
1352            lifecycle_state: BundleLifecycleState::Staged,
1353        })
1354        .map_err(IntegrationError::from_dist_error)?;
1355
1356        Ok(StageBundleResult {
1357            bundle_id,
1358            canonical_ref: descriptor.canonical_ref.clone(),
1359            descriptor,
1360            resolved_artifact,
1361            verification_report,
1362            cache_entry,
1363            stage_audit_fields: StageAuditFields {
1364                staged_at: unix_now(),
1365                requested_access_mode: input.requested_access_mode.clone(),
1366                verification_policy_ref: input.verification_policy_ref.clone(),
1367                cache_policy_ref: input.cache_policy_ref.clone(),
1368                tenant: input.tenant.clone(),
1369                team: input.team.clone(),
1370            },
1371        })
1372    }
1373
1374    pub fn warm_bundle(
1375        &self,
1376        input: &WarmBundleInput,
1377        advisory_set: Option<&AdvisorySet>,
1378        verification_policy: &VerificationPolicy,
1379    ) -> Result<WarmBundleResult, IntegrationError> {
1380        let expected_bundle_id = bundle_id_for_digest(&input.cache_key);
1381        if input.bundle_id != expected_bundle_id {
1382            return Err(IntegrationError {
1383                code: IntegrationErrorCode::InvalidReference,
1384                summary: format!(
1385                    "bundle id {} does not match cache key {}",
1386                    input.bundle_id, input.cache_key
1387                ),
1388                retryable: false,
1389                details: Some(serde_json::json!({
1390                    "bundle_id": input.bundle_id,
1391                    "expected_bundle_id": expected_bundle_id,
1392                    "cache_key": input.cache_key,
1393                })),
1394            });
1395        }
1396
1397        let mut resolved_artifact = self
1398            .open_cached(&input.cache_key)
1399            .map_err(IntegrationError::from_dist_error)?;
1400        if let Some(expected_operator_version) = &input.expected_operator_version {
1401            resolved_artifact.descriptor.annotations.insert(
1402                "operator_version".to_string(),
1403                serde_json::Value::String(expected_operator_version.clone()),
1404            );
1405        }
1406        let verification_report = self
1407            .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1408            .map_err(IntegrationError::from_dist_error)?;
1409        if !verification_report.passed {
1410            return Err(IntegrationError::from_verification_report(
1411                verification_report,
1412            ));
1413        }
1414        let opened = self.artifact_opener.open(
1415            &resolved_artifact,
1416            &ArtifactOpenRequest {
1417                bundle_id: input.bundle_id.clone(),
1418                dry_run: input.dry_run,
1419                smoke_test: input.smoke_test,
1420            },
1421        )?;
1422
1423        Ok(WarmBundleResult {
1424            bundle_id: input.bundle_id.clone(),
1425            bundle_manifest_summary: opened.bundle_manifest_summary,
1426            bundle_open_mode: opened.bundle_open_mode,
1427            warnings: verification_report
1428                .warnings
1429                .iter()
1430                .cloned()
1431                .chain(opened.warnings)
1432                .collect(),
1433            errors: verification_report.errors.clone(),
1434            verification_report,
1435            warm_audit_fields: WarmAuditFields {
1436                warmed_at: unix_now(),
1437                smoke_test: input.smoke_test,
1438                dry_run: input.dry_run,
1439                reopened_from_cache: true,
1440            },
1441        })
1442    }
1443
1444    pub fn rollback_bundle(
1445        &self,
1446        input: &RollbackBundleInput,
1447        advisory_set: Option<&AdvisorySet>,
1448        verification_policy: &VerificationPolicy,
1449    ) -> Result<RollbackBundleResult, IntegrationError> {
1450        let bundle_record = self.stat_bundle(&input.target_bundle_id).ok();
1451        let digest = if let Some(record) = &bundle_record {
1452            normalize_digest(&record.cache_key)
1453        } else {
1454            digest_from_bundle_id(&input.target_bundle_id).ok_or_else(|| IntegrationError {
1455                code: IntegrationErrorCode::InvalidReference,
1456                summary: format!("invalid bundle id {}", input.target_bundle_id),
1457                retryable: false,
1458                details: Some(serde_json::json!({
1459                    "bundle_id": input.target_bundle_id,
1460                })),
1461            })?
1462        };
1463
1464        if let Some(expected_cache_key) = &input.expected_cache_key
1465            && expected_cache_key != &digest
1466        {
1467            return Err(IntegrationError {
1468                code: IntegrationErrorCode::InvalidReference,
1469                summary: format!(
1470                    "expected cache key {} does not match rollback bundle digest {}",
1471                    expected_cache_key, digest
1472                ),
1473                retryable: false,
1474                details: Some(serde_json::json!({
1475                    "bundle_id": input.target_bundle_id,
1476                    "expected_cache_key": expected_cache_key,
1477                    "actual_cache_key": digest,
1478                })),
1479            });
1480        }
1481
1482        let resolved_artifact = self
1483            .open_cached(&digest)
1484            .map_err(IntegrationError::from_dist_error)?;
1485        let verification_report = self
1486            .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1487            .map_err(IntegrationError::from_dist_error)?;
1488        if !verification_report.passed {
1489            return Err(IntegrationError::from_verification_report(
1490                verification_report,
1491            ));
1492        }
1493        let cache_entry = self
1494            .stat_cache(&digest)
1495            .map_err(IntegrationError::from_dist_error)?;
1496
1497        Ok(RollbackBundleResult {
1498            bundle_id: input.target_bundle_id.clone(),
1499            reopened_from_cache: true,
1500            cache_entry,
1501            verification_report,
1502            rollback_audit_fields: RollbackAuditFields {
1503                rolled_back_at: unix_now(),
1504                reopened_from_cache: true,
1505                expected_cache_key: input.expected_cache_key.clone(),
1506            },
1507        })
1508    }
1509
1510    pub fn stat_bundle(&self, bundle_id: &str) -> Result<BundleRecord, DistError> {
1511        self.cache.read_bundle_record(bundle_id).map_err(|source| {
1512            if source.kind() == std::io::ErrorKind::NotFound {
1513                DistError::NotFound {
1514                    reference: bundle_id.to_string(),
1515                }
1516            } else {
1517                DistError::CacheError {
1518                    path: self
1519                        .cache
1520                        .bundle_record_path(bundle_id)
1521                        .display()
1522                        .to_string(),
1523                    source,
1524                }
1525            }
1526        })
1527    }
1528
1529    pub fn list_bundles(&self) -> Result<Vec<BundleRecord>, DistError> {
1530        self.cache
1531            .list_bundle_records()
1532            .map_err(|source| DistError::CacheError {
1533                path: self.cache.bundle_records_root().display().to_string(),
1534                source,
1535            })
1536    }
1537
1538    pub fn set_bundle_state(
1539        &self,
1540        bundle_id: &str,
1541        lifecycle_state: BundleLifecycleState,
1542    ) -> Result<BundleRecord, DistError> {
1543        let mut record = self.stat_bundle(bundle_id)?;
1544        record.lifecycle_state = lifecycle_state;
1545        self.persist_bundle_record(&record)?;
1546        Ok(record)
1547    }
1548
1549    pub async fn fetch(
1550        &self,
1551        descriptor: &ArtifactDescriptor,
1552        _cache_policy: CachePolicy,
1553    ) -> Result<ResolvedArtifact, DistError> {
1554        if let Ok(existing) = self.open_cached(&descriptor.cache_key()) {
1555            return Ok(existing);
1556        }
1557
1558        let raw = descriptor.raw_ref.as_str();
1559        let artifact = match descriptor.source_kind {
1560            ArtifactSourceKind::CacheDigest => self.open_cached(&descriptor.digest)?,
1561            ArtifactSourceKind::Https => self.fetch_http(raw).await?,
1562            ArtifactSourceKind::File => self.ingest_file(Path::new(raw)).await?,
1563            ArtifactSourceKind::Oci => {
1564                let reference = descriptor
1565                    .canonical_ref
1566                    .trim_start_matches("oci://")
1567                    .to_string();
1568                self.pull_oci(&reference).await?
1569            }
1570            ArtifactSourceKind::Repo => {
1571                self.resolve_repo_ref(raw.trim_start_matches("repo://"))
1572                    .await?
1573            }
1574            ArtifactSourceKind::Store => {
1575                self.resolve_store_ref(raw.trim_start_matches("store://"))
1576                    .await?
1577            }
1578            #[cfg(feature = "fixture-resolver")]
1579            ArtifactSourceKind::Fixture => {
1580                self.resolve_fixture_ref(raw.trim_start_matches("fixture://"))
1581                    .await?
1582            }
1583            #[cfg(not(feature = "fixture-resolver"))]
1584            ArtifactSourceKind::Fixture => {
1585                return Err(DistError::InvalidInput(
1586                    "fixture resolver feature is disabled".to_string(),
1587                ));
1588            }
1589        };
1590
1591        if !descriptor.digest.is_empty() && artifact.descriptor.digest != descriptor.digest {
1592            return Err(DistError::CorruptArtifact {
1593                reference: descriptor.digest.clone(),
1594                reason: format!(
1595                    "fetched digest {} did not match resolved descriptor {}",
1596                    artifact.descriptor.digest, descriptor.digest
1597                ),
1598            });
1599        }
1600        Ok(artifact)
1601    }
1602
1603    pub fn open_cached(&self, digest_or_cache_key: &str) -> Result<ResolvedArtifact, DistError> {
1604        let digest = normalize_digest(digest_or_cache_key);
1605        let entry = self.stat_cache(&digest)?;
1606        let path = self
1607            .cache
1608            .existing_component(&entry.digest)
1609            .ok_or_else(|| DistError::CorruptArtifact {
1610                reference: digest_or_cache_key.to_string(),
1611                reason: "cache metadata exists but cached blob is missing".to_string(),
1612            })?;
1613        let mut artifact = ResolvedArtifact::from_path(
1614            entry.digest.clone(),
1615            path.clone(),
1616            component_id_from_descriptor(&entry),
1617            None,
1618            None,
1619            Some(entry.size_bytes),
1620            Some(entry.media_type.clone()),
1621            false,
1622            legacy_source_from_entry(&entry),
1623        );
1624        artifact.descriptor = descriptor_from_entry(&entry);
1625        artifact.cache_key = entry.cache_key.clone();
1626        artifact.local_path = path.clone();
1627        artifact.fetched_at = entry.fetched_at;
1628        artifact.integrity_state = integrity_state_from_entry(&entry.state);
1629        artifact.source_snapshot = entry.source_snapshot.clone();
1630        artifact.cache_path = Some(path.clone());
1631        artifact.wasm_path = Some(path);
1632        Ok(artifact)
1633    }
1634
1635    pub fn stat_cache(&self, digest_or_cache_key: &str) -> Result<CacheEntry, DistError> {
1636        let digest = normalize_digest(digest_or_cache_key);
1637        let entry = self.cache.read_entry(&digest).map_err(|source| {
1638            if source.kind() == std::io::ErrorKind::NotFound {
1639                DistError::NotFound {
1640                    reference: digest_or_cache_key.to_string(),
1641                }
1642            } else {
1643                DistError::CacheError {
1644                    path: self.cache.entry_path(&digest).display().to_string(),
1645                    source,
1646                }
1647            }
1648        })?;
1649        let _ = self.cache.touch_last_used(&entry.digest);
1650        Ok(entry)
1651    }
1652
1653    #[deprecated(note = "use evaluate_retention/apply_retention for explicit retention decisions")]
1654    pub fn evict_cache(&self, digests: &[String]) -> Result<RetentionReport, DistError> {
1655        let mut report = RetentionReport {
1656            scanned_entries: digests.len(),
1657            ..RetentionReport::default()
1658        };
1659        let entries = digests
1660            .iter()
1661            .filter_map(|digest| self.stat_cache(digest).ok())
1662            .collect::<Vec<_>>();
1663        let input = RetentionInput {
1664            entries,
1665            active_bundle_ids: Vec::new(),
1666            staged_bundle_ids: Vec::new(),
1667            warming_bundle_ids: Vec::new(),
1668            ready_bundle_ids: Vec::new(),
1669            draining_bundle_ids: Vec::new(),
1670            session_referenced_bundle_ids: Vec::new(),
1671            max_cache_bytes: 0,
1672            max_entry_age: Some(0),
1673            minimum_rollback_depth: 0,
1674            environment: RetentionEnvironment::Dev,
1675        };
1676        let outcome = self.apply_retention(&input)?;
1677        report.evicted = outcome.report.evicted;
1678        report.bytes_reclaimed = outcome.report.bytes_reclaimed;
1679        report.refusals = digests
1680            .iter()
1681            .filter(|digest| self.stat_cache(digest).is_err())
1682            .cloned()
1683            .chain(outcome.report.refusals)
1684            .collect();
1685        report.kept = report
1686            .scanned_entries
1687            .saturating_sub(report.evicted + report.refusals.len());
1688        Ok(report)
1689    }
1690
1691    #[deprecated(note = "use parse_source + resolve + fetch instead")]
1692    pub async fn resolve_ref(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
1693        let source = self.parse_source(reference)?;
1694        let descriptor = self.resolve(source, ResolvePolicy).await?;
1695        self.fetch(&descriptor, CachePolicy).await
1696    }
1697
1698    #[allow(deprecated)]
1699    #[deprecated(note = "use parse_source + resolve + fetch instead")]
1700    pub async fn resolve_ref_request(
1701        &self,
1702        req: ResolveRefRequest,
1703    ) -> Result<ResolvedArtifact, DistError> {
1704        self.resolve_ref(&req.reference).await
1705    }
1706
1707    #[allow(deprecated)]
1708    #[deprecated(note = "use parse_source + resolve + fetch instead")]
1709    pub async fn resolve_component(
1710        &self,
1711        req: ResolveComponentRequest,
1712    ) -> Result<ResolvedArtifact, DistError> {
1713        self.resolve_ref(&req.reference).await
1714    }
1715
1716    #[allow(deprecated)]
1717    #[deprecated(note = "use parse_source + resolve + fetch or open_cached instead")]
1718    pub async fn ensure_cached(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
1719        let resolved = self.resolve_ref(reference).await?;
1720        if resolved.wasm_bytes.is_some() {
1721            return Ok(resolved);
1722        }
1723        if let Some(path) = &resolved.wasm_path
1724            && path.exists()
1725        {
1726            return Ok(resolved);
1727        }
1728        Err(DistError::NotFound {
1729            reference: reference.to_string(),
1730        })
1731    }
1732
1733    pub async fn fetch_digest(&self, digest: &str) -> Result<PathBuf, DistError> {
1734        let normalized = normalize_digest(digest);
1735        self.cache
1736            .existing_component(&normalized)
1737            .ok_or(DistError::NotFound {
1738                reference: normalized,
1739            })
1740    }
1741
1742    pub async fn pull_lock(&self, lock_path: &Path) -> Result<Vec<ResolvedArtifact>, DistError> {
1743        let contents = fs::read_to_string(lock_path).map_err(|source| DistError::CacheError {
1744            path: lock_path.display().to_string(),
1745            source,
1746        })?;
1747        let entries = parse_lockfile(&contents)?;
1748        let mut resolved = Vec::with_capacity(entries.len());
1749        for entry in entries {
1750            let resolved_item = if let Some(digest) = entry.digest.as_ref() {
1751                if let Ok(item) = self.open_cached(digest) {
1752                    item
1753                } else {
1754                    let reference = entry
1755                        .reference
1756                        .clone()
1757                        .ok_or_else(|| DistError::InvalidInput("lock entry missing ref".into()))?;
1758                    let source = self.parse_source(&reference)?;
1759                    let mut descriptor = self.resolve(source, ResolvePolicy).await?;
1760                    if !descriptor.digest.is_empty() && descriptor.digest != *digest {
1761                        return Err(DistError::CorruptArtifact {
1762                            reference: reference.clone(),
1763                            reason: format!(
1764                                "lock digest {} did not match resolved descriptor {}",
1765                                digest, descriptor.digest
1766                            ),
1767                        });
1768                    }
1769                    descriptor.digest = digest.clone();
1770                    self.fetch(&descriptor, CachePolicy).await?
1771                }
1772            } else {
1773                let reference = entry
1774                    .reference
1775                    .clone()
1776                    .ok_or_else(|| DistError::InvalidInput("lock entry missing ref".into()))?;
1777                let source = self.parse_source(&reference)?;
1778                let descriptor = self.resolve(source, ResolvePolicy).await?;
1779                self.fetch(&descriptor, CachePolicy).await?
1780            };
1781            resolved.push(resolved_item);
1782        }
1783        Ok(resolved)
1784    }
1785
1786    pub fn list_cache(&self) -> Vec<String> {
1787        self.cache.list_digests()
1788    }
1789
1790    pub fn list_cache_entries(&self) -> Vec<CacheEntry> {
1791        self.cache
1792            .list_digests()
1793            .into_iter()
1794            .filter_map(|digest| self.stat_cache(&digest).ok())
1795            .collect()
1796    }
1797
1798    pub fn evaluate_retention(
1799        &self,
1800        input: &RetentionInput,
1801    ) -> Result<RetentionOutcome, DistError> {
1802        let decisions = retention_decisions(input);
1803        let mut report = RetentionReport {
1804            scanned_entries: decisions.len(),
1805            ..RetentionReport::default()
1806        };
1807        for decision in &decisions {
1808            match decision.decision {
1809                RetentionDisposition::Keep => report.kept += 1,
1810                RetentionDisposition::Evict => report.evicted += 1,
1811                RetentionDisposition::Protect => report.protected += 1,
1812            }
1813        }
1814        Ok(RetentionOutcome { decisions, report })
1815    }
1816
1817    pub fn apply_retention(&self, input: &RetentionInput) -> Result<RetentionOutcome, DistError> {
1818        let mut outcome = self.evaluate_retention(input)?;
1819        let decision_map = input
1820            .entries
1821            .iter()
1822            .map(|entry| (entry.cache_key.clone(), entry.size_bytes))
1823            .collect::<std::collections::BTreeMap<_, _>>();
1824        let evicted = outcome
1825            .decisions
1826            .iter()
1827            .filter(|decision| matches!(decision.decision, RetentionDisposition::Evict))
1828            .map(|decision| decision.cache_key.clone())
1829            .collect::<Vec<_>>();
1830        for cache_key in evicted {
1831            let digest = normalize_digest(&cache_key);
1832            let dir = self.cache.component_dir(&digest);
1833            if dir.exists() {
1834                fs::remove_dir_all(&dir).map_err(|source| DistError::CacheError {
1835                    path: dir.display().to_string(),
1836                    source,
1837                })?;
1838                self.cache
1839                    .remove_bundle_record(&bundle_id_for_digest(&digest))
1840                    .ok();
1841                outcome.report.bytes_reclaimed +=
1842                    decision_map.get(&cache_key).copied().unwrap_or(0);
1843            } else {
1844                outcome.report.refusals.push(cache_key);
1845            }
1846        }
1847        outcome.report.kept = outcome
1848            .decisions
1849            .iter()
1850            .filter(|decision| matches!(decision.decision, RetentionDisposition::Keep))
1851            .count();
1852        outcome.report.protected = outcome
1853            .decisions
1854            .iter()
1855            .filter(|decision| matches!(decision.decision, RetentionDisposition::Protect))
1856            .count();
1857        outcome.report.evicted = outcome
1858            .decisions
1859            .iter()
1860            .filter(|decision| matches!(decision.decision, RetentionDisposition::Evict))
1861            .count()
1862            .saturating_sub(outcome.report.refusals.len());
1863        Ok(outcome)
1864    }
1865
1866    fn persist_bundle_record(&self, record: &BundleRecord) -> Result<(), DistError> {
1867        self.cache
1868            .write_bundle_record(record)
1869            .map_err(|source| DistError::CacheError {
1870                path: self
1871                    .cache
1872                    .bundle_record_path(&record.bundle_id)
1873                    .display()
1874                    .to_string(),
1875                source,
1876            })
1877    }
1878
1879    #[deprecated(note = "use evict_cache or apply_retention instead")]
1880    pub fn remove_cached(&self, digests: &[String]) -> Result<(), DistError> {
1881        for digest in digests {
1882            for dir in [
1883                self.cache.component_dir(digest),
1884                self.cache.legacy_component_dir(digest),
1885            ] {
1886                if dir.exists() {
1887                    fs::remove_dir_all(&dir).map_err(|source| DistError::CacheError {
1888                        path: dir.display().to_string(),
1889                        source,
1890                    })?;
1891                }
1892            }
1893            self.cache
1894                .remove_bundle_record(&bundle_id_for_digest(digest))
1895                .ok();
1896        }
1897        Ok(())
1898    }
1899
1900    #[deprecated(note = "use evaluate_retention/apply_retention for cache lifecycle management")]
1901    pub fn gc(&self) -> Result<Vec<String>, DistError> {
1902        let mut removed = Vec::new();
1903        for digest in self.cache.list_digests() {
1904            let primary = self.cache.component_path(&digest);
1905            let legacy = self.cache.legacy_component_path(&digest);
1906            if !primary.exists() && !legacy.exists() {
1907                let dir = self.cache.component_dir(&digest);
1908                let legacy_dir = self.cache.legacy_component_dir(&digest);
1909                fs::remove_dir_all(&dir).ok();
1910                fs::remove_dir_all(&legacy_dir).ok();
1911                self.cache
1912                    .remove_bundle_record(&bundle_id_for_digest(&digest))
1913                    .ok();
1914                removed.push(digest);
1915            }
1916        }
1917        for record in self.list_bundles()? {
1918            if self.stat_cache(&record.cache_key).is_err() {
1919                self.cache.remove_bundle_record(&record.bundle_id).ok();
1920                removed.push(record.cache_key);
1921            }
1922        }
1923        removed.sort();
1924        removed.dedup();
1925        Ok(removed)
1926    }
1927
1928    async fn fetch_http(&self, url: &str) -> Result<ResolvedArtifact, DistError> {
1929        if self.opts.offline {
1930            return Err(DistError::Offline {
1931                reference: url.to_string(),
1932            });
1933        }
1934        let request_url = ensure_secure_http_url(url, self.opts.allow_insecure_local_http)?;
1935        let bytes = self
1936            .http
1937            .get(request_url.clone())
1938            .send()
1939            .await
1940            .map_err(|err| DistError::Network(err.to_string()))?;
1941        let response = bytes
1942            .error_for_status()
1943            .map_err(|err| DistError::Network(err.to_string()))?;
1944        let content_type = response
1945            .headers()
1946            .get(reqwest::header::CONTENT_TYPE)
1947            .and_then(|v| v.to_str().ok())
1948            .map(str::to_string)
1949            .or_else(|| Some(WASM_CONTENT_TYPE.to_string()));
1950        let bytes = response
1951            .bytes()
1952            .await
1953            .map_err(|err| DistError::Network(err.to_string()))?;
1954        let digest = digest_for_bytes(&bytes);
1955        let path = self
1956            .cache
1957            .write_component(&digest, &bytes)
1958            .map_err(|source| DistError::CacheError {
1959                path: self.cache.component_path(&digest).display().to_string(),
1960                source,
1961            })?;
1962        let resolved = ResolvedArtifact::from_path(
1963            digest,
1964            path,
1965            component_id_from_ref(&RefKind::Http(request_url.to_string())),
1966            None,
1967            None,
1968            Some(bytes.len() as u64),
1969            content_type,
1970            true,
1971            LegacyArtifactSource::Http(request_url.to_string()),
1972        );
1973        self.persist_cache_entry(&resolved)?;
1974        self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
1975        resolved.validate_payload()?;
1976        Ok(resolved)
1977    }
1978
1979    async fn ingest_file(&self, path: &Path) -> Result<ResolvedArtifact, DistError> {
1980        let bytes = fs::read(path).map_err(|source| DistError::CacheError {
1981            path: path.display().to_string(),
1982            source,
1983        })?;
1984        let digest = digest_for_bytes(&bytes);
1985        let cached = self
1986            .cache
1987            .write_component(&digest, &bytes)
1988            .map_err(|source| DistError::CacheError {
1989                path: self.cache.component_path(&digest).display().to_string(),
1990                source,
1991            })?;
1992        let resolved = ResolvedArtifact::from_path(
1993            digest,
1994            cached,
1995            component_id_from_ref(&RefKind::File(path.to_path_buf())),
1996            None,
1997            source_sidecar_describe_ref(path),
1998            Some(bytes.len() as u64),
1999            Some(WASM_CONTENT_TYPE.to_string()),
2000            true,
2001            LegacyArtifactSource::File(path.to_path_buf()),
2002        );
2003        self.persist_cache_entry(&resolved)?;
2004        self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
2005        resolved.validate_payload()?;
2006        Ok(resolved)
2007    }
2008
2009    async fn pull_oci(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
2010        let component_id = component_id_from_ref(&RefKind::Oci(reference.to_string()));
2011        self.pull_oci_with_source(
2012            reference,
2013            LegacyArtifactSource::Oci(reference.to_string()),
2014            component_id,
2015        )
2016        .await
2017    }
2018
2019    async fn pull_oci_with_source(
2020        &self,
2021        reference: &str,
2022        source: LegacyArtifactSource,
2023        component_id: String,
2024    ) -> Result<ResolvedArtifact, DistError> {
2025        if self.opts.offline {
2026            return Err(DistError::Offline {
2027                reference: reference.to_string(),
2028            });
2029        }
2030        let result = self
2031            .oci
2032            .resolve_refs(&crate::oci_components::ComponentsExtension {
2033                refs: vec![reference.to_string()],
2034                mode: crate::oci_components::ComponentsMode::Eager,
2035            })
2036            .await
2037            .map_err(DistError::Oci)?;
2038        let resolved = result
2039            .into_iter()
2040            .next()
2041            .ok_or_else(|| DistError::InvalidRef {
2042                reference: reference.to_string(),
2043            })?;
2044        let resolved_digest = resolved.resolved_digest.clone();
2045        let resolved_bytes = fs::read(&resolved.path).map_err(|source| DistError::CacheError {
2046            path: resolved.path.display().to_string(),
2047            source,
2048        })?;
2049        let resolved = ResolvedArtifact::from_path(
2050            resolved_digest.clone(),
2051            self.cache
2052                .write_component(&resolved_digest, &resolved_bytes)
2053                .map_err(|source| DistError::CacheError {
2054                    path: self
2055                        .cache
2056                        .component_path(&resolved_digest)
2057                        .display()
2058                        .to_string(),
2059                    source,
2060                })?,
2061            resolve_component_id_from_cache(&resolved.path, &component_id),
2062            resolve_abi_version_from_cache(&resolved.path),
2063            resolve_describe_artifact_ref_from_cache(&resolved.path),
2064            file_size_if_exists(&resolved.path),
2065            Some(normalize_content_type(
2066                Some(&resolved.media_type),
2067                WASM_CONTENT_TYPE,
2068            )),
2069            resolved.fetched_from_network,
2070            source,
2071        );
2072        self.persist_cache_entry(&resolved)?;
2073        self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
2074        resolved.validate_payload()?;
2075        Ok(resolved)
2076    }
2077
2078    async fn resolve_repo_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2079        if self.opts.repo_registry_base.is_none() {
2080            return Err(DistError::ResolutionUnavailable {
2081                reference: format!("repo://{target}"),
2082            });
2083        }
2084        let mapped = map_registry_target(target, self.opts.repo_registry_base.as_deref())
2085            .ok_or_else(|| DistError::Unauthorized {
2086                target: format!("repo://{target}"),
2087            })?;
2088        self.pull_oci_with_source(
2089            &mapped,
2090            LegacyArtifactSource::Repo(format!("repo://{target}")),
2091            target.to_string(),
2092        )
2093        .await
2094    }
2095
2096    async fn resolve_store_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2097        if is_greentic_biz_store_target(target) {
2098            let parsed = parse_greentic_biz_store_target(target)?;
2099            let client = self.greentic_biz_store_client(&parsed.tenant).await?;
2100            return self
2101                .pull_oci_with_source_and_client(
2102                    &parsed.mapped_reference,
2103                    LegacyArtifactSource::Store(format!("store://{target}")),
2104                    target.to_string(),
2105                    client,
2106                )
2107                .await;
2108        }
2109        if self.opts.store_registry_base.is_none() {
2110            return Err(DistError::ResolutionUnavailable {
2111                reference: format!("store://{target}"),
2112            });
2113        }
2114        let mapped = map_registry_target(target, self.opts.store_registry_base.as_deref())
2115            .ok_or_else(|| DistError::Unauthorized {
2116                target: format!("store://{target}"),
2117            })?;
2118        self.pull_oci_with_source(
2119            &mapped,
2120            LegacyArtifactSource::Store(format!("store://{target}")),
2121            target.to_string(),
2122        )
2123        .await
2124    }
2125
2126    #[cfg(feature = "fixture-resolver")]
2127    async fn resolve_fixture_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2128        let fixture_dir = self
2129            .opts
2130            .fixture_dir
2131            .as_ref()
2132            .ok_or_else(|| DistError::InvalidInput("fixture:// requires fixture_dir".into()))?;
2133        let raw = target.trim_start_matches('/');
2134        let candidate = if raw.ends_with(".wasm") {
2135            fixture_dir.join(raw)
2136        } else {
2137            fixture_dir.join(format!("{raw}.wasm"))
2138        };
2139        if !candidate.exists() {
2140            return Err(DistError::NotFound {
2141                reference: format!("fixture://{target}"),
2142            });
2143        }
2144        self.ingest_file(&candidate).await
2145    }
2146
2147    fn materialize_injected(
2148        &self,
2149        resolved: InjectedResolution,
2150    ) -> Result<ResolvedArtifact, DistError> {
2151        let artifact = match resolved {
2152            InjectedResolution::Redirect(_) => {
2153                return Err(DistError::InvalidInput(
2154                    "unexpected redirect during materialization".into(),
2155                ));
2156            }
2157            InjectedResolution::WasmBytes {
2158                resolved_digest,
2159                wasm_bytes,
2160                component_id,
2161                abi_version,
2162                source,
2163            } => {
2164                let legacy_source = legacy_source_from_public(&source);
2165                let path = self
2166                    .cache
2167                    .write_component(&resolved_digest, &wasm_bytes)
2168                    .map_err(|source| DistError::CacheError {
2169                        path: self
2170                            .cache
2171                            .component_path(&resolved_digest)
2172                            .display()
2173                            .to_string(),
2174                        source,
2175                    })?;
2176                ResolvedArtifact::from_path(
2177                    resolved_digest,
2178                    path,
2179                    component_id,
2180                    abi_version,
2181                    None,
2182                    Some(wasm_bytes.len() as u64),
2183                    Some(WASM_CONTENT_TYPE.to_string()),
2184                    true,
2185                    legacy_source,
2186                )
2187            }
2188            InjectedResolution::WasmPath {
2189                resolved_digest,
2190                wasm_path,
2191                component_id,
2192                abi_version,
2193                source,
2194            } => {
2195                let legacy_source = legacy_source_from_public(&source);
2196                ResolvedArtifact::from_path(
2197                    resolved_digest,
2198                    wasm_path.clone(),
2199                    component_id,
2200                    abi_version,
2201                    resolve_describe_artifact_ref_from_cache(&wasm_path),
2202                    file_size_if_exists(&wasm_path),
2203                    Some(WASM_CONTENT_TYPE.to_string()),
2204                    false,
2205                    legacy_source,
2206                )
2207            }
2208        };
2209        self.persist_cache_entry(&artifact)?;
2210        self.enforce_cache_cap(Some(&artifact.descriptor.digest))?;
2211        artifact.validate_payload()?;
2212        Ok(artifact)
2213    }
2214
2215    fn enforce_cache_cap(&self, current_digest: Option<&str>) -> Result<(), DistError> {
2216        let bundle_records = self.list_bundles()?;
2217        let mut staged_bundle_ids = bundle_records
2218            .iter()
2219            .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Staged))
2220            .map(|record| record.bundle_id.clone())
2221            .collect::<Vec<_>>();
2222        let warming_bundle_ids = bundle_records
2223            .iter()
2224            .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Warming))
2225            .map(|record| record.bundle_id.clone())
2226            .collect::<Vec<_>>();
2227        let ready_bundle_ids = bundle_records
2228            .iter()
2229            .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Ready))
2230            .map(|record| record.bundle_id.clone())
2231            .collect::<Vec<_>>();
2232        let draining_bundle_ids = bundle_records
2233            .iter()
2234            .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Draining))
2235            .map(|record| record.bundle_id.clone())
2236            .collect::<Vec<_>>();
2237        if let Some(current_digest) = current_digest {
2238            staged_bundle_ids.push(bundle_id_for_digest(current_digest));
2239        }
2240        self.apply_retention(&RetentionInput {
2241            entries: self.list_cache_entries(),
2242            active_bundle_ids: Vec::new(),
2243            staged_bundle_ids,
2244            warming_bundle_ids,
2245            ready_bundle_ids,
2246            draining_bundle_ids,
2247            session_referenced_bundle_ids: Vec::new(),
2248            max_cache_bytes: self.opts.cache_max_bytes,
2249            max_entry_age: None,
2250            minimum_rollback_depth: 0,
2251            environment: RetentionEnvironment::Dev,
2252        })
2253        .map(|_| ())
2254    }
2255
2256    pub async fn pull_oci_with_details(
2257        &self,
2258        reference: &str,
2259    ) -> Result<OciCacheInspection, DistError> {
2260        if self.opts.offline {
2261            return Err(DistError::Offline {
2262                reference: reference.to_string(),
2263            });
2264        }
2265        let result = self
2266            .oci
2267            .resolve_refs(&crate::oci_components::ComponentsExtension {
2268                refs: vec![reference.to_string()],
2269                mode: crate::oci_components::ComponentsMode::Eager,
2270            })
2271            .await
2272            .map_err(DistError::Oci)?;
2273        let resolved = result
2274            .into_iter()
2275            .next()
2276            .ok_or_else(|| DistError::InvalidRef {
2277                reference: reference.to_string(),
2278            })?;
2279        let cache_dir = resolved
2280            .path
2281            .parent()
2282            .map(|p| p.to_path_buf())
2283            .ok_or_else(|| DistError::InvalidInput("cache path missing parent".into()))?;
2284        Ok(OciCacheInspection {
2285            digest: resolved.resolved_digest,
2286            cache_dir,
2287            selected_media_type: resolved.media_type,
2288            fetched: resolved.fetched_from_network,
2289        })
2290    }
2291}
2292
2293fn default_distribution_cache_root() -> PathBuf {
2294    if let Ok(root) = std::env::var("GREENTIC_HOME") {
2295        return PathBuf::from(root).join("cache").join("distribution");
2296    }
2297    if let Ok(home) = std::env::var("HOME") {
2298        return PathBuf::from(home)
2299            .join(".greentic")
2300            .join("cache")
2301            .join("distribution");
2302    }
2303    PathBuf::from(".greentic")
2304        .join("cache")
2305        .join("distribution")
2306}
2307
2308#[derive(Debug, Error)]
2309pub enum DistError {
2310    #[error("invalid reference `{reference}`")]
2311    InvalidRef { reference: String },
2312    #[error("reference `{reference}` not found")]
2313    NotFound { reference: String },
2314    #[error("unauthorized for `{target}`")]
2315    Unauthorized { target: String },
2316    #[error("resolution unavailable for `{reference}`")]
2317    ResolutionUnavailable { reference: String },
2318    #[error("network error: {0}")]
2319    Network(String),
2320    #[error("corrupt artifact `{reference}`: {reason}")]
2321    CorruptArtifact { reference: String, reason: String },
2322    #[error("unsupported abi `{abi}`")]
2323    UnsupportedAbi { abi: String },
2324    #[error("cache error at `{path}`: {source}")]
2325    CacheError {
2326        path: String,
2327        #[source]
2328        source: std::io::Error,
2329    },
2330    #[error("invalid input: {0}")]
2331    InvalidInput(String),
2332    #[error("insecure url `{url}`: only https is allowed")]
2333    InsecureUrl { url: String },
2334    #[error("offline mode forbids fetching `{reference}`")]
2335    Offline { reference: String },
2336    #[error("store auth error: {0}")]
2337    StoreAuth(String),
2338    #[error("oci error: {0}")]
2339    Oci(#[from] crate::oci_components::OciComponentError),
2340    #[error("invalid lockfile: {0}")]
2341    Serde(#[from] serde_json::Error),
2342}
2343
2344impl DistError {
2345    pub fn exit_code(&self) -> i32 {
2346        match self {
2347            DistError::InvalidRef { .. }
2348            | DistError::InvalidInput(_)
2349            | DistError::InsecureUrl { .. }
2350            | DistError::Serde(_) => 2,
2351            DistError::NotFound { .. } => 3,
2352            DistError::Offline { .. } => 4,
2353            DistError::Unauthorized { .. }
2354            | DistError::ResolutionUnavailable { .. }
2355            | DistError::StoreAuth(_) => 5,
2356            _ => 10,
2357        }
2358    }
2359}
2360
2361impl IntegrationError {
2362    fn from_dist_error(error: DistError) -> Self {
2363        match error {
2364            DistError::InvalidRef { reference } => Self {
2365                code: IntegrationErrorCode::InvalidReference,
2366                summary: format!("invalid reference `{reference}`"),
2367                retryable: false,
2368                details: Some(serde_json::json!({ "reference": reference })),
2369            },
2370            DistError::NotFound { reference } => Self {
2371                code: IntegrationErrorCode::CacheMiss,
2372                summary: format!("artifact `{reference}` was not found"),
2373                retryable: true,
2374                details: Some(serde_json::json!({ "reference": reference })),
2375            },
2376            DistError::Unauthorized { target } => Self {
2377                code: IntegrationErrorCode::ResolutionFailed,
2378                summary: format!("unauthorized for `{target}`"),
2379                retryable: false,
2380                details: Some(serde_json::json!({ "target": target })),
2381            },
2382            DistError::ResolutionUnavailable { reference } => Self {
2383                code: IntegrationErrorCode::ResolutionUnavailable,
2384                summary: format!("resolution unavailable for `{reference}`"),
2385                retryable: false,
2386                details: Some(serde_json::json!({ "reference": reference })),
2387            },
2388            DistError::Network(summary) => Self {
2389                code: IntegrationErrorCode::DownloadFailed,
2390                summary,
2391                retryable: true,
2392                details: None,
2393            },
2394            DistError::CorruptArtifact { reference, reason } => Self {
2395                code: IntegrationErrorCode::CacheCorrupt,
2396                summary: format!("corrupt artifact `{reference}`: {reason}"),
2397                retryable: false,
2398                details: Some(serde_json::json!({ "reference": reference, "reason": reason })),
2399            },
2400            DistError::UnsupportedAbi { abi } => Self {
2401                code: IntegrationErrorCode::UnsupportedArtifactType,
2402                summary: format!("unsupported abi `{abi}`"),
2403                retryable: false,
2404                details: Some(serde_json::json!({ "abi": abi })),
2405            },
2406            DistError::CacheError { path, source } => Self {
2407                code: IntegrationErrorCode::BundleOpenFailed,
2408                summary: format!("cache error at `{path}`: {source}"),
2409                retryable: true,
2410                details: Some(serde_json::json!({ "path": path })),
2411            },
2412            DistError::InvalidInput(summary) => Self {
2413                code: IntegrationErrorCode::PolicyInputInvalid,
2414                summary,
2415                retryable: false,
2416                details: None,
2417            },
2418            DistError::InsecureUrl { url } => Self {
2419                code: IntegrationErrorCode::UnsupportedSource,
2420                summary: format!("insecure url `{url}`: only https is allowed"),
2421                retryable: false,
2422                details: Some(serde_json::json!({ "url": url })),
2423            },
2424            DistError::Offline { reference } => Self {
2425                code: IntegrationErrorCode::OfflineRequiredButUnavailable,
2426                summary: format!("offline mode forbids fetching `{reference}`"),
2427                retryable: true,
2428                details: Some(serde_json::json!({ "reference": reference })),
2429            },
2430            DistError::StoreAuth(summary) => Self {
2431                code: IntegrationErrorCode::ResolutionFailed,
2432                summary,
2433                retryable: false,
2434                details: None,
2435            },
2436            DistError::Oci(source) => Self {
2437                code: IntegrationErrorCode::ResolutionFailed,
2438                summary: source.to_string(),
2439                retryable: true,
2440                details: None,
2441            },
2442            DistError::Serde(source) => Self {
2443                code: IntegrationErrorCode::DescriptorCorrupt,
2444                summary: format!("invalid serialized input: {source}"),
2445                retryable: false,
2446                details: None,
2447            },
2448        }
2449    }
2450
2451    fn from_verification_report(report: VerificationReport) -> Self {
2452        let code = verification_failure_code(&report);
2453        let summary = report
2454            .errors
2455            .first()
2456            .cloned()
2457            .or_else(|| report.warnings.first().cloned())
2458            .unwrap_or_else(|| "verification failed".to_string());
2459        Self {
2460            code,
2461            summary,
2462            retryable: false,
2463            details: Some(serde_json::json!({
2464                "artifact_digest": report.artifact_digest,
2465                "canonical_ref": report.canonical_ref,
2466                "errors": report.errors,
2467                "warnings": report.warnings,
2468                "failed_checks": report
2469                    .checks
2470                    .iter()
2471                    .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2472                    .map(|check| serde_json::json!({
2473                        "name": check.name,
2474                        "detail": check.detail,
2475                        "payload": check.payload,
2476                    }))
2477                    .collect::<Vec<_>>(),
2478            })),
2479        }
2480    }
2481}
2482
2483impl ArtifactOpener for DefaultArtifactOpener {
2484    fn open(
2485        &self,
2486        artifact: &ResolvedArtifact,
2487        request: &ArtifactOpenRequest,
2488    ) -> Result<ArtifactOpenOutput, IntegrationError> {
2489        Ok(ArtifactOpenOutput {
2490            bundle_manifest_summary: manifest_summary_from_artifact(artifact),
2491            bundle_open_mode: if request.dry_run {
2492                BundleOpenMode::CacheReuse
2493            } else {
2494                BundleOpenMode::Userspace
2495            },
2496            warnings: if request.smoke_test {
2497                vec!["smoke test requested but no bundle runtime opener is configured".to_string()]
2498            } else {
2499                Vec::new()
2500            },
2501        })
2502    }
2503}
2504
2505pub type ResolveError = DistError;
2506
2507#[derive(Clone, Debug)]
2508struct ComponentCache {
2509    base: PathBuf,
2510}
2511
2512impl ComponentCache {
2513    fn new(base: PathBuf) -> Self {
2514        Self { base }
2515    }
2516
2517    fn artifacts_root(&self) -> PathBuf {
2518        self.base.join("artifacts").join("sha256")
2519    }
2520
2521    fn bundle_records_root(&self) -> PathBuf {
2522        self.base.join("bundles")
2523    }
2524
2525    fn bundle_record_path(&self, bundle_id: &str) -> PathBuf {
2526        let safe = bundle_id.replace(':', "__");
2527        self.bundle_records_root().join(format!("{safe}.json"))
2528    }
2529
2530    fn component_dir(&self, digest: &str) -> PathBuf {
2531        let normalized = trim_digest_prefix(&normalize_digest(digest)).to_string();
2532        let (prefix, rest) = normalized.split_at(normalized.len().min(2));
2533        self.artifacts_root().join(prefix).join(rest)
2534    }
2535
2536    fn legacy_component_dir(&self, digest: &str) -> PathBuf {
2537        self.base
2538            .join(trim_digest_prefix(&normalize_digest(digest)))
2539    }
2540
2541    fn component_path(&self, digest: &str) -> PathBuf {
2542        self.component_dir(digest).join("blob")
2543    }
2544
2545    fn legacy_component_path(&self, digest: &str) -> PathBuf {
2546        self.legacy_component_dir(digest).join("component.wasm")
2547    }
2548
2549    fn entry_path(&self, digest: &str) -> PathBuf {
2550        self.component_dir(digest).join("entry.json")
2551    }
2552
2553    fn existing_component(&self, digest: &str) -> Option<PathBuf> {
2554        let path = self.component_path(digest);
2555        if path.exists() {
2556            let _ = self.touch_last_used(digest);
2557            Some(path)
2558        } else {
2559            let legacy = self.legacy_component_path(digest);
2560            if legacy.exists() {
2561                let _ = self.touch_last_used(digest);
2562                Some(legacy)
2563            } else {
2564                None
2565            }
2566        }
2567    }
2568
2569    fn write_component(&self, digest: &str, data: &[u8]) -> Result<PathBuf, std::io::Error> {
2570        let dir = self.component_dir(digest);
2571        fs::create_dir_all(&dir)?;
2572        let path = dir.join("blob");
2573        fs::write(&path, data)?;
2574        self.touch_last_used(digest)?;
2575        Ok(path)
2576    }
2577
2578    fn write_entry(&self, entry: &CacheEntry) -> Result<(), std::io::Error> {
2579        let path = self.entry_path(&entry.digest);
2580        let bytes = serde_json::to_vec_pretty(entry)
2581            .map_err(|err| std::io::Error::other(err.to_string()))?;
2582        fs::write(path, bytes)
2583    }
2584
2585    fn read_entry(&self, digest: &str) -> Result<CacheEntry, std::io::Error> {
2586        let path = self.entry_path(digest);
2587        let bytes = fs::read(path)?;
2588        serde_json::from_slice(&bytes).map_err(|err| std::io::Error::other(err.to_string()))
2589    }
2590
2591    fn write_bundle_record(&self, record: &BundleRecord) -> Result<(), std::io::Error> {
2592        let path = self.bundle_record_path(&record.bundle_id);
2593        if let Some(parent) = path.parent() {
2594            fs::create_dir_all(parent)?;
2595        }
2596        let bytes = serde_json::to_vec_pretty(record)
2597            .map_err(|err| std::io::Error::other(err.to_string()))?;
2598        fs::write(path, bytes)
2599    }
2600
2601    fn read_bundle_record(&self, bundle_id: &str) -> Result<BundleRecord, std::io::Error> {
2602        let path = self.bundle_record_path(bundle_id);
2603        let bytes = fs::read(path)?;
2604        serde_json::from_slice(&bytes).map_err(|err| std::io::Error::other(err.to_string()))
2605    }
2606
2607    fn remove_bundle_record(&self, bundle_id: &str) -> Result<(), std::io::Error> {
2608        let path = self.bundle_record_path(bundle_id);
2609        if path.exists() {
2610            fs::remove_file(path)
2611        } else {
2612            Ok(())
2613        }
2614    }
2615
2616    fn list_bundle_records(&self) -> Result<Vec<BundleRecord>, std::io::Error> {
2617        let root = self.bundle_records_root();
2618        let mut out = Vec::new();
2619        let Ok(entries) = fs::read_dir(root) else {
2620            return Ok(out);
2621        };
2622        for entry in entries.flatten() {
2623            let path = entry.path();
2624            if !path.is_file() {
2625                continue;
2626            }
2627            let bytes = fs::read(&path)?;
2628            let record = serde_json::from_slice::<BundleRecord>(&bytes)
2629                .map_err(|err| std::io::Error::other(format!("{}: {err}", path.display())))?;
2630            out.push(record);
2631        }
2632        out.sort_by(|a, b| a.bundle_id.cmp(&b.bundle_id));
2633        Ok(out)
2634    }
2635
2636    fn list_digests(&self) -> Vec<String> {
2637        let mut digests = Vec::new();
2638        self.collect_digests(&self.base, &mut digests);
2639        let root = self.artifacts_root();
2640        if root != self.base {
2641            self.collect_digests(&root, &mut digests);
2642        }
2643        digests.sort();
2644        digests.dedup();
2645        digests
2646    }
2647
2648    fn collect_digests(&self, dir: &Path, digests: &mut Vec<String>) {
2649        let _ = &self.base;
2650        if let Ok(entries) = fs::read_dir(dir) {
2651            for entry in entries.flatten() {
2652                let path = entry.path();
2653                if path.is_dir() {
2654                    let blob = path.join("blob");
2655                    let legacy_blob = path.join("component.wasm");
2656                    if blob.exists() || legacy_blob.exists() {
2657                        if let Some(digest) = digest_from_component_dir(&path) {
2658                            digests.push(digest);
2659                        }
2660                    } else {
2661                        self.collect_digests(&path, digests);
2662                    }
2663                }
2664            }
2665        }
2666    }
2667
2668    fn touch_last_used(&self, digest: &str) -> Result<(), std::io::Error> {
2669        let marker = self.component_dir(digest).join("last_used");
2670        let seq = LAST_USED_COUNTER.fetch_add(1, Ordering::Relaxed);
2671        fs::write(marker, seq.to_string())
2672    }
2673}
2674
2675fn unix_now() -> u64 {
2676    SystemTime::now()
2677        .duration_since(UNIX_EPOCH)
2678        .unwrap_or_default()
2679        .as_secs()
2680}
2681
2682fn artifact_source_from_reference(
2683    reference: &str,
2684    opts: &DistOptions,
2685) -> Result<ArtifactSource, DistError> {
2686    let kind = match classify_reference(reference)? {
2687        RefKind::Digest(_) => ArtifactSourceKind::CacheDigest,
2688        RefKind::Http(_) => ArtifactSourceKind::Https,
2689        RefKind::File(_) => ArtifactSourceKind::File,
2690        RefKind::Oci(_) => ArtifactSourceKind::Oci,
2691        RefKind::Repo(_) => ArtifactSourceKind::Repo,
2692        RefKind::Store(_) => ArtifactSourceKind::Store,
2693        #[cfg(feature = "fixture-resolver")]
2694        RefKind::Fixture(_) => ArtifactSourceKind::Fixture,
2695    };
2696    Ok(ArtifactSource {
2697        raw_ref: reference.to_string(),
2698        kind: kind.clone(),
2699        transport_hints: TransportHints {
2700            offline: opts.offline,
2701            allow_insecure_local_http: opts.allow_insecure_local_http,
2702        },
2703        dev_mode: matches!(kind, ArtifactSourceKind::Fixture | ArtifactSourceKind::File),
2704    })
2705}
2706
2707fn canonical_oci_ref(reference: &str, digest: &str) -> String {
2708    let repo = canonical_oci_component_id(reference);
2709    format!("oci://{repo}@{digest}")
2710}
2711
2712fn digest_from_component_dir(dir: &Path) -> Option<String> {
2713    let leaf = dir.file_name()?.to_str()?;
2714    if leaf.len() == 64 && leaf.chars().all(|ch| ch.is_ascii_hexdigit()) {
2715        return Some(format!("sha256:{leaf}"));
2716    }
2717    let parent = dir.parent()?.file_name()?.to_str()?;
2718    if parent.len() == 2 && leaf.chars().all(|ch| ch.is_ascii_hexdigit()) {
2719        Some(format!("sha256:{parent}{leaf}"))
2720    } else {
2721        None
2722    }
2723}
2724
2725fn cache_entry_state_from_integrity(state: &IntegrityState) -> CacheEntryState {
2726    match state {
2727        IntegrityState::Partial => CacheEntryState::Partial,
2728        IntegrityState::Ready => CacheEntryState::Ready,
2729        IntegrityState::Corrupt => CacheEntryState::Corrupt,
2730        IntegrityState::Evicted => CacheEntryState::Evicted,
2731    }
2732}
2733
2734fn integrity_state_from_entry(state: &CacheEntryState) -> IntegrityState {
2735    match state {
2736        CacheEntryState::Partial => IntegrityState::Partial,
2737        CacheEntryState::Ready => IntegrityState::Ready,
2738        CacheEntryState::Corrupt => IntegrityState::Corrupt,
2739        CacheEntryState::Evicted => IntegrityState::Evicted,
2740    }
2741}
2742
2743fn descriptor_from_entry(entry: &CacheEntry) -> ArtifactDescriptor {
2744    ArtifactDescriptor {
2745        artifact_type: entry.artifact_type.clone(),
2746        source_kind: entry.source_kind.clone(),
2747        raw_ref: entry.raw_ref.clone(),
2748        canonical_ref: entry.canonical_ref.clone(),
2749        digest: entry.digest.clone(),
2750        media_type: entry.media_type.clone(),
2751        size_bytes: entry.size_bytes,
2752        created_at: None,
2753        annotations: serde_json::Map::new(),
2754        manifest_digest: None,
2755        resolved_via: match entry.source_kind {
2756            ArtifactSourceKind::Repo => ResolvedVia::RepoMapping,
2757            ArtifactSourceKind::Store => ResolvedVia::StoreMapping,
2758            ArtifactSourceKind::Fixture => ResolvedVia::Fixture,
2759            ArtifactSourceKind::File => ResolvedVia::File,
2760            ArtifactSourceKind::Https => ResolvedVia::Https,
2761            ArtifactSourceKind::CacheDigest => ResolvedVia::CacheDigest,
2762            ArtifactSourceKind::Oci => ResolvedVia::Direct,
2763        },
2764        signature_refs: Vec::new(),
2765        sbom_refs: Vec::new(),
2766    }
2767}
2768
2769fn legacy_source_from_entry(entry: &CacheEntry) -> LegacyArtifactSource {
2770    match entry.source_kind {
2771        ArtifactSourceKind::CacheDigest => LegacyArtifactSource::Digest,
2772        ArtifactSourceKind::Https => LegacyArtifactSource::Http(entry.raw_ref.clone()),
2773        ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
2774            LegacyArtifactSource::File(PathBuf::from(entry.raw_ref.clone()))
2775        }
2776        ArtifactSourceKind::Oci => {
2777            LegacyArtifactSource::Oci(entry.canonical_ref.trim_start_matches("oci://").to_string())
2778        }
2779        ArtifactSourceKind::Repo => LegacyArtifactSource::Repo(entry.raw_ref.clone()),
2780        ArtifactSourceKind::Store => LegacyArtifactSource::Store(entry.raw_ref.clone()),
2781    }
2782}
2783
2784fn component_id_from_descriptor(entry: &CacheEntry) -> String {
2785    component_id_from_ref(&match entry.source_kind {
2786        ArtifactSourceKind::CacheDigest => RefKind::Digest(entry.digest.clone()),
2787        ArtifactSourceKind::Https => RefKind::Http(entry.raw_ref.clone()),
2788        ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
2789            RefKind::File(PathBuf::from(entry.raw_ref.clone()))
2790        }
2791        ArtifactSourceKind::Oci => {
2792            RefKind::Oci(entry.canonical_ref.trim_start_matches("oci://").to_string())
2793        }
2794        ArtifactSourceKind::Repo => RefKind::Repo(entry.raw_ref.clone()),
2795        ArtifactSourceKind::Store => RefKind::Store(entry.raw_ref.clone()),
2796    })
2797}
2798
2799fn verification_outcome_name(outcome: &VerificationOutcome) -> &'static str {
2800    match outcome {
2801        VerificationOutcome::Passed => "passed",
2802        VerificationOutcome::Failed => "failed",
2803        VerificationOutcome::Warning => "warning",
2804        VerificationOutcome::Skipped => "skipped",
2805    }
2806}
2807
2808fn make_check(
2809    name: &str,
2810    outcome: VerificationOutcome,
2811    detail: impl Into<String>,
2812    payload: Option<serde_json::Value>,
2813) -> VerificationCheck {
2814    VerificationCheck {
2815        name: name.to_string(),
2816        outcome,
2817        detail: detail.into(),
2818        payload,
2819    }
2820}
2821
2822fn preliminary_decision_from_checks(checks: Vec<VerificationCheck>) -> PreliminaryDecision {
2823    let warnings = checks
2824        .iter()
2825        .filter(|check| matches!(check.outcome, VerificationOutcome::Warning))
2826        .map(|check| check.detail.clone())
2827        .collect::<Vec<_>>();
2828    let errors = checks
2829        .iter()
2830        .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2831        .map(|check| check.detail.clone())
2832        .collect::<Vec<_>>();
2833    PreliminaryDecision {
2834        passed: errors.is_empty(),
2835        checks,
2836        warnings,
2837        errors,
2838    }
2839}
2840
2841fn verification_report_from_checks(
2842    descriptor: &ArtifactDescriptor,
2843    advisory_set: Option<&AdvisorySet>,
2844    verification_policy: &VerificationPolicy,
2845    cache_entry: Option<&CacheEntry>,
2846    checks: Vec<VerificationCheck>,
2847) -> VerificationReport {
2848    let warnings = checks
2849        .iter()
2850        .filter(|check| matches!(check.outcome, VerificationOutcome::Warning))
2851        .map(|check| check.detail.clone())
2852        .collect::<Vec<_>>();
2853    let errors = checks
2854        .iter()
2855        .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2856        .map(|check| check.detail.clone())
2857        .collect::<Vec<_>>();
2858
2859    VerificationReport {
2860        artifact_digest: descriptor.digest.clone(),
2861        canonical_ref: descriptor.canonical_ref.clone(),
2862        passed: errors.is_empty(),
2863        warnings,
2864        errors,
2865        policy_fingerprint: policy_fingerprint(verification_policy),
2866        advisory_version: advisory_set.map(|advisory| advisory.version.clone()),
2867        cache_entry_fingerprint: cache_entry.map(cache_entry_fingerprint),
2868        checks,
2869    }
2870}
2871
2872fn policy_fingerprint(policy: &VerificationPolicy) -> String {
2873    let bytes = serde_json::to_vec(policy).unwrap_or_default();
2874    digest_for_bytes(&bytes)
2875}
2876
2877fn cache_entry_fingerprint(entry: &CacheEntry) -> String {
2878    let bytes = serde_json::to_vec(entry).unwrap_or_default();
2879    digest_for_bytes(&bytes)
2880}
2881
2882fn issuer_from_descriptor(descriptor: &ArtifactDescriptor) -> Option<String> {
2883    descriptor
2884        .annotations
2885        .get("issuer")
2886        .and_then(|value| value.as_str())
2887        .map(|raw| raw.to_string())
2888}
2889
2890fn minimum_operator_version_from_descriptor(descriptor: &ArtifactDescriptor) -> Option<String> {
2891    descriptor
2892        .annotations
2893        .get("minimum_operator_version")
2894        .and_then(|value| value.as_str())
2895        .map(|raw| raw.to_string())
2896}
2897
2898fn check_digest_allowed(
2899    digest: &str,
2900    advisory_set: Option<&AdvisorySet>,
2901    verification_policy: &VerificationPolicy,
2902) -> VerificationCheck {
2903    let denied = verification_policy
2904        .deny_digests
2905        .iter()
2906        .chain(
2907            advisory_set
2908                .into_iter()
2909                .flat_map(|advisory| advisory.deny_digests.iter()),
2910        )
2911        .any(|candidate| candidate == digest);
2912    if denied {
2913        make_check(
2914            "digest_allowed",
2915            VerificationOutcome::Failed,
2916            format!("digest {digest} is denied by policy or advisory"),
2917            Some(serde_json::json!({
2918                "digest": digest,
2919                "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
2920            })),
2921        )
2922    } else {
2923        make_check(
2924            "digest_allowed",
2925            VerificationOutcome::Passed,
2926            format!("digest {digest} is allowed"),
2927            Some(serde_json::json!({
2928                "digest": digest,
2929            })),
2930        )
2931    }
2932}
2933
2934fn check_media_type_allowed(
2935    media_type: &str,
2936    verification_policy: &VerificationPolicy,
2937) -> VerificationCheck {
2938    if verification_policy.allowed_media_types.is_empty() {
2939        return make_check(
2940            "media_type_allowed",
2941            VerificationOutcome::Skipped,
2942            "no media type allowlist configured",
2943            Some(serde_json::json!({
2944                "media_type": media_type,
2945            })),
2946        );
2947    }
2948    if verification_policy
2949        .allowed_media_types
2950        .iter()
2951        .any(|candidate| candidate == media_type)
2952    {
2953        make_check(
2954            "media_type_allowed",
2955            VerificationOutcome::Passed,
2956            format!("media type {media_type} is allowed"),
2957            Some(serde_json::json!({
2958                "media_type": media_type,
2959            })),
2960        )
2961    } else {
2962        make_check(
2963            "media_type_allowed",
2964            VerificationOutcome::Failed,
2965            format!("media type {media_type} is not allowed"),
2966            Some(serde_json::json!({
2967                "media_type": media_type,
2968                "allowed_media_types": verification_policy.allowed_media_types,
2969            })),
2970        )
2971    }
2972}
2973
2974fn check_issuer_allowed(
2975    issuer: Option<String>,
2976    advisory_set: Option<&AdvisorySet>,
2977    verification_policy: &VerificationPolicy,
2978) -> VerificationCheck {
2979    let Some(issuer) = issuer else {
2980        return make_check(
2981            "issuer_allowed",
2982            VerificationOutcome::Warning,
2983            "issuer metadata is missing",
2984            Some(serde_json::json!({
2985                "issuer": null,
2986                "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
2987            })),
2988        );
2989    };
2990    if verification_policy
2991        .deny_issuers
2992        .iter()
2993        .chain(
2994            advisory_set
2995                .into_iter()
2996                .flat_map(|advisory| advisory.deny_issuers.iter()),
2997        )
2998        .any(|candidate| candidate == &issuer)
2999    {
3000        return make_check(
3001            "issuer_allowed",
3002            VerificationOutcome::Failed,
3003            format!("issuer {issuer} is denied"),
3004            Some(serde_json::json!({
3005                "issuer": issuer,
3006                "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
3007            })),
3008        );
3009    }
3010    if !verification_policy.trusted_issuers.is_empty()
3011        && !verification_policy
3012            .trusted_issuers
3013            .iter()
3014            .any(|candidate| candidate == &issuer)
3015    {
3016        return make_check(
3017            "issuer_allowed",
3018            VerificationOutcome::Warning,
3019            format!("issuer {issuer} is not on the trusted issuer allowlist"),
3020            Some(serde_json::json!({
3021                "issuer": issuer,
3022                "trusted_issuers": verification_policy.trusted_issuers,
3023            })),
3024        );
3025    }
3026    make_check(
3027        "issuer_allowed",
3028        VerificationOutcome::Passed,
3029        format!("issuer {issuer} is allowed"),
3030        Some(serde_json::json!({
3031            "issuer": issuer,
3032        })),
3033    )
3034}
3035
3036fn check_operator_version_compatible(
3037    descriptor: &ArtifactDescriptor,
3038    advisory_set: Option<&AdvisorySet>,
3039    verification_policy: &VerificationPolicy,
3040) -> VerificationCheck {
3041    let required = verification_policy
3042        .minimum_operator_version
3043        .clone()
3044        .or_else(|| advisory_set.and_then(|advisory| advisory.minimum_operator_version.clone()))
3045        .or_else(|| minimum_operator_version_from_descriptor(descriptor));
3046    let Some(required) = required else {
3047        return make_check(
3048            "operator_version_compatible",
3049            VerificationOutcome::Skipped,
3050            "no minimum operator version requirement present",
3051            Some(serde_json::json!({
3052                "required": null,
3053                "actual": descriptor
3054                    .annotations
3055                    .get("operator_version")
3056                    .and_then(|value| value.as_str()),
3057            })),
3058        );
3059    };
3060
3061    let actual = descriptor
3062        .annotations
3063        .get("operator_version")
3064        .and_then(|value| value.as_str())
3065        .map(|raw| raw.to_string());
3066    let Some(actual) = actual else {
3067        return make_check(
3068            "operator_version_compatible",
3069            VerificationOutcome::Warning,
3070            format!(
3071                "minimum operator version {required} is declared but actual operator version is unknown"
3072            ),
3073            Some(serde_json::json!({
3074                "required": required,
3075                "actual": null,
3076            })),
3077        );
3078    };
3079
3080    let required_parsed = semver::Version::parse(&required);
3081    let actual_parsed = semver::Version::parse(&actual);
3082    match (required_parsed, actual_parsed) {
3083        (Ok(required), Ok(actual)) if actual >= required => make_check(
3084            "operator_version_compatible",
3085            VerificationOutcome::Passed,
3086            format!("operator version {actual} satisfies minimum {required}"),
3087            Some(serde_json::json!({
3088                "required": required.to_string(),
3089                "actual": actual.to_string(),
3090            })),
3091        ),
3092        (Ok(required), Ok(actual)) => {
3093            let outcome = match verification_policy.environment {
3094                VerificationEnvironment::Dev => VerificationOutcome::Warning,
3095                VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3096                    VerificationOutcome::Failed
3097                }
3098            };
3099            make_check(
3100                "operator_version_compatible",
3101                outcome,
3102                format!("operator version {actual} does not satisfy minimum {required}"),
3103                Some(serde_json::json!({
3104                    "required": required.to_string(),
3105                    "actual": actual.to_string(),
3106                    "environment": verification_environment_name(&verification_policy.environment),
3107                })),
3108            )
3109        }
3110        _ => make_check(
3111            "operator_version_compatible",
3112            VerificationOutcome::Warning,
3113            format!(
3114                "operator version metadata is not parseable (actual={actual}, required={required})"
3115            ),
3116            Some(serde_json::json!({
3117                "required": required,
3118                "actual": actual,
3119            })),
3120        ),
3121    }
3122}
3123
3124fn check_content_digest_match(
3125    resolved_artifact: &ResolvedArtifact,
3126) -> Result<VerificationCheck, DistError> {
3127    let bytes = resolved_artifact.wasm_bytes()?;
3128    let computed = digest_for_bytes(bytes);
3129    Ok(if computed == resolved_artifact.descriptor.digest {
3130        make_check(
3131            "content_digest_match",
3132            VerificationOutcome::Passed,
3133            format!(
3134                "content digest matches {}",
3135                resolved_artifact.descriptor.digest
3136            ),
3137            None,
3138        )
3139    } else {
3140        make_check(
3141            "content_digest_match",
3142            VerificationOutcome::Failed,
3143            format!(
3144                "content digest {} did not match descriptor {}",
3145                computed, resolved_artifact.descriptor.digest
3146            ),
3147            Some(serde_json::json!({
3148                "expected": resolved_artifact.descriptor.digest,
3149                "actual": computed,
3150            })),
3151        )
3152    })
3153}
3154
3155fn check_signature_present(
3156    descriptor: &ArtifactDescriptor,
3157    verification_policy: &VerificationPolicy,
3158) -> VerificationCheck {
3159    if descriptor.signature_refs.is_empty() {
3160        let outcome = match verification_policy.environment {
3161            VerificationEnvironment::Dev => VerificationOutcome::Warning,
3162            VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3163                if verification_policy.require_signature {
3164                    VerificationOutcome::Failed
3165                } else {
3166                    VerificationOutcome::Warning
3167                }
3168            }
3169        };
3170        return make_check(
3171            "signature_present",
3172            outcome,
3173            "no signature references are present",
3174            Some(serde_json::json!({
3175                "count": 0,
3176                "required": verification_policy.require_signature,
3177                "environment": verification_environment_name(&verification_policy.environment),
3178            })),
3179        );
3180    }
3181    make_check(
3182        "signature_present",
3183        VerificationOutcome::Passed,
3184        "signature references are present",
3185        Some(serde_json::json!({
3186            "count": descriptor.signature_refs.len(),
3187            "required": verification_policy.require_signature,
3188            "environment": verification_environment_name(&verification_policy.environment),
3189        })),
3190    )
3191}
3192
3193fn check_signature_verified(
3194    descriptor: &ArtifactDescriptor,
3195    verification_policy: &VerificationPolicy,
3196) -> VerificationCheck {
3197    let detail = if descriptor.signature_refs.is_empty() {
3198        "signature verification could not run because no signature references are present"
3199    } else {
3200        "signature verification is not implemented in the open-source client"
3201    };
3202    let outcome = if verification_policy.require_signature {
3203        match verification_policy.environment {
3204            VerificationEnvironment::Dev => VerificationOutcome::Warning,
3205            VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3206                VerificationOutcome::Failed
3207            }
3208        }
3209    } else if descriptor.signature_refs.is_empty() {
3210        VerificationOutcome::Skipped
3211    } else {
3212        VerificationOutcome::Warning
3213    };
3214    make_check(
3215        "signature_verified",
3216        outcome,
3217        detail,
3218        Some(serde_json::json!({
3219            "implemented": false,
3220            "signature_count": descriptor.signature_refs.len(),
3221            "required": verification_policy.require_signature,
3222            "environment": verification_environment_name(&verification_policy.environment),
3223        })),
3224    )
3225}
3226
3227fn check_sbom_present(
3228    descriptor: &ArtifactDescriptor,
3229    verification_policy: &VerificationPolicy,
3230) -> VerificationCheck {
3231    if descriptor.sbom_refs.is_empty() {
3232        return make_check(
3233            "sbom_present",
3234            if verification_policy.require_sbom {
3235                VerificationOutcome::Failed
3236            } else {
3237                VerificationOutcome::Warning
3238            },
3239            "no SBOM references are present",
3240            Some(serde_json::json!({
3241                "count": 0,
3242                "required": verification_policy.require_sbom,
3243            })),
3244        );
3245    }
3246    make_check(
3247        "sbom_present",
3248        VerificationOutcome::Passed,
3249        "SBOM references are present",
3250        Some(serde_json::json!({
3251            "count": descriptor.sbom_refs.len(),
3252            "required": verification_policy.require_sbom,
3253        })),
3254    )
3255}
3256
3257fn verification_environment_name(environment: &VerificationEnvironment) -> &'static str {
3258    match environment {
3259        VerificationEnvironment::Dev => "dev",
3260        VerificationEnvironment::Staging => "staging",
3261        VerificationEnvironment::Prod => "prod",
3262    }
3263}
3264
3265fn bundle_id_for_digest(digest: &str) -> String {
3266    format!("bundle:{}", normalize_digest(digest))
3267}
3268
3269fn digest_from_bundle_id(bundle_id: &str) -> Option<String> {
3270    bundle_id
3271        .strip_prefix("bundle:")
3272        .map(normalize_digest)
3273        .filter(|digest| digest.starts_with("sha256:"))
3274}
3275
3276fn manifest_summary_from_artifact(artifact: &ResolvedArtifact) -> BundleManifestSummary {
3277    BundleManifestSummary {
3278        component_id: artifact.component_id.clone(),
3279        abi_version: artifact.abi_version.clone(),
3280        describe_artifact_ref: artifact.describe_artifact_ref.clone(),
3281        artifact_type: artifact.descriptor.artifact_type.clone(),
3282        media_type: artifact.descriptor.media_type.clone(),
3283        size_bytes: artifact.descriptor.size_bytes,
3284    }
3285}
3286
3287fn verification_failure_code(report: &VerificationReport) -> IntegrationErrorCode {
3288    for check in &report.checks {
3289        if !matches!(check.outcome, VerificationOutcome::Failed) {
3290            continue;
3291        }
3292        return match check.name.as_str() {
3293            "digest_allowed" => IntegrationErrorCode::DigestDenied,
3294            "media_type_allowed" => IntegrationErrorCode::MediaTypeRejected,
3295            "issuer_allowed" => IntegrationErrorCode::IssuerRejected,
3296            "content_digest_match" => IntegrationErrorCode::DigestMismatch,
3297            "signature_present" | "signature_verified" => IntegrationErrorCode::SignatureRequired,
3298            "operator_version_compatible" => IntegrationErrorCode::VerificationFailed,
3299            "sbom_present" => IntegrationErrorCode::VerificationFailed,
3300            _ => IntegrationErrorCode::VerificationFailed,
3301        };
3302    }
3303    IntegrationErrorCode::VerificationFailed
3304}
3305
3306fn retention_decisions(input: &RetentionInput) -> Vec<RetentionDecision> {
3307    let mut decisions = Vec::with_capacity(input.entries.len());
3308    let protected_active = input
3309        .active_bundle_ids
3310        .iter()
3311        .cloned()
3312        .collect::<std::collections::BTreeSet<_>>();
3313    let protected_staged = input
3314        .staged_bundle_ids
3315        .iter()
3316        .cloned()
3317        .collect::<std::collections::BTreeSet<_>>();
3318    let protected_warming = input
3319        .warming_bundle_ids
3320        .iter()
3321        .cloned()
3322        .collect::<std::collections::BTreeSet<_>>();
3323    let protected_ready = input
3324        .ready_bundle_ids
3325        .iter()
3326        .cloned()
3327        .collect::<std::collections::BTreeSet<_>>();
3328    let protected_draining = input
3329        .draining_bundle_ids
3330        .iter()
3331        .cloned()
3332        .collect::<std::collections::BTreeSet<_>>();
3333    let protected_session = input
3334        .session_referenced_bundle_ids
3335        .iter()
3336        .cloned()
3337        .collect::<std::collections::BTreeSet<_>>();
3338    let protected_rollback = rollback_protected_bundle_ids(
3339        &input.active_bundle_ids,
3340        &input.staged_bundle_ids,
3341        &input.entries,
3342        input.minimum_rollback_depth,
3343    );
3344
3345    let now = unix_now();
3346    let mut candidate_indices = Vec::new();
3347    let mut total_bytes = input
3348        .entries
3349        .iter()
3350        .map(|entry| entry.size_bytes)
3351        .sum::<u64>();
3352
3353    for (index, entry) in input.entries.iter().enumerate() {
3354        let bundle_id = bundle_id_for_digest(&entry.digest);
3355        let protection = if protected_active.contains(&bundle_id) {
3356            Some(("active_bundle", "bundle is currently active".to_string()))
3357        } else if protected_session.contains(&bundle_id) {
3358            Some((
3359                "session_reference",
3360                "bundle is referenced by a live session".to_string(),
3361            ))
3362        } else if protected_warming.contains(&bundle_id) {
3363            Some(("warming_bundle", "bundle is currently warming".to_string()))
3364        } else if protected_ready.contains(&bundle_id) {
3365            Some(("ready_bundle", "bundle is currently ready".to_string()))
3366        } else if protected_draining.contains(&bundle_id) {
3367            Some((
3368                "draining_bundle",
3369                "bundle is currently draining".to_string(),
3370            ))
3371        } else if protected_staged.contains(&bundle_id) {
3372            Some(("staged_bundle", "bundle is currently staged".to_string()))
3373        } else if protected_rollback.contains(&bundle_id) {
3374            Some((
3375                "rollback_depth",
3376                "bundle is protected for rollback depth".to_string(),
3377            ))
3378        } else {
3379            None
3380        };
3381
3382        if let Some((reason_code, reason_detail)) = protection {
3383            decisions.push(RetentionDecision {
3384                cache_key: entry.cache_key.clone(),
3385                bundle_id,
3386                decision: RetentionDisposition::Protect,
3387                reason_code: reason_code.to_string(),
3388                reason_detail,
3389            });
3390            continue;
3391        }
3392
3393        decisions.push(RetentionDecision {
3394            cache_key: entry.cache_key.clone(),
3395            bundle_id,
3396            decision: RetentionDisposition::Keep,
3397            reason_code: "within_policy".to_string(),
3398            reason_detail: "entry remains available".to_string(),
3399        });
3400        candidate_indices.push(index);
3401    }
3402
3403    let mut candidate_order = candidate_indices
3404        .into_iter()
3405        .map(|index| {
3406            let entry = &input.entries[index];
3407            let is_corrupt = matches!(entry.state, CacheEntryState::Corrupt);
3408            let rollback_eligible =
3409                protected_rollback.contains(&bundle_id_for_digest(&entry.digest));
3410            let age_secs = now.saturating_sub(entry.last_accessed_at);
3411            (index, is_corrupt, rollback_eligible, age_secs)
3412        })
3413        .collect::<Vec<_>>();
3414    candidate_order.sort_by(|a, b| {
3415        b.1.cmp(&a.1)
3416            .then_with(|| a.2.cmp(&b.2))
3417            .then_with(|| b.3.cmp(&a.3))
3418            .then_with(|| {
3419                input.entries[a.0]
3420                    .cache_key
3421                    .cmp(&input.entries[b.0].cache_key)
3422            })
3423    });
3424
3425    for (index, is_corrupt, _, age_secs) in candidate_order {
3426        let entry = &input.entries[index];
3427        let aged_out = input
3428            .max_entry_age
3429            .is_some_and(|max_age| age_secs > max_age);
3430        let over_budget = input.max_cache_bytes > 0 && total_bytes > input.max_cache_bytes;
3431        if !is_corrupt && !aged_out && !over_budget {
3432            continue;
3433        }
3434
3435        let decision = &mut decisions[index];
3436        decision.decision = RetentionDisposition::Evict;
3437        if is_corrupt {
3438            decision.reason_code = "corrupt_entry".to_string();
3439            decision.reason_detail = "corrupt entry can be evicted safely".to_string();
3440        } else if aged_out {
3441            decision.reason_code = "max_age_exceeded".to_string();
3442            decision.reason_detail = "entry exceeded retention age".to_string();
3443        } else {
3444            decision.reason_code = "cache_budget".to_string();
3445            decision.reason_detail =
3446                "entry selected for eviction under cache budget pressure".to_string();
3447        }
3448        total_bytes = total_bytes.saturating_sub(entry.size_bytes);
3449    }
3450
3451    decisions
3452}
3453
3454fn rollback_protected_bundle_ids(
3455    active_bundle_ids: &[String],
3456    staged_bundle_ids: &[String],
3457    entries: &[CacheEntry],
3458    minimum_rollback_depth: usize,
3459) -> std::collections::BTreeSet<String> {
3460    if minimum_rollback_depth == 0 {
3461        return std::collections::BTreeSet::new();
3462    }
3463    let active = active_bundle_ids
3464        .iter()
3465        .cloned()
3466        .collect::<std::collections::BTreeSet<_>>();
3467    let mut protected = std::collections::BTreeSet::new();
3468    let ordered_bundle_ids = if staged_bundle_ids.is_empty() {
3469        let mut entries = entries.iter().collect::<Vec<_>>();
3470        entries.sort_by(|a, b| {
3471            a.fetched_at
3472                .cmp(&b.fetched_at)
3473                .then_with(|| a.cache_key.cmp(&b.cache_key))
3474        });
3475        entries
3476            .into_iter()
3477            .map(|entry| bundle_id_for_digest(&entry.digest))
3478            .collect::<Vec<_>>()
3479    } else {
3480        staged_bundle_ids.to_vec()
3481    };
3482    for bundle_id in ordered_bundle_ids.iter().rev() {
3483        if active.contains(bundle_id) {
3484            continue;
3485        }
3486        protected.insert(bundle_id.clone());
3487        if protected.len() >= minimum_rollback_depth {
3488            break;
3489        }
3490    }
3491    protected
3492}
3493
3494fn digest_for_bytes(bytes: &[u8]) -> String {
3495    let mut hasher = Sha256::new();
3496    hasher.update(bytes);
3497    format!("sha256:{:x}", hasher.finalize())
3498}
3499
3500fn trim_digest_prefix(digest: &str) -> &str {
3501    digest
3502        .strip_prefix("sha256:")
3503        .unwrap_or_else(|| digest.trim_start_matches('@'))
3504}
3505
3506fn normalize_digest(digest: &str) -> String {
3507    if digest.starts_with("sha256:") {
3508        digest.to_string()
3509    } else {
3510        format!("sha256:{digest}")
3511    }
3512}
3513
3514fn component_id_from_ref(kind: &RefKind) -> String {
3515    match kind {
3516        RefKind::Digest(digest) => digest.clone(),
3517        RefKind::Http(url) => Url::parse(url)
3518            .ok()
3519            .and_then(|u| {
3520                Path::new(u.path())
3521                    .file_stem()
3522                    .and_then(|stem| stem.to_str())
3523                    .map(strip_file_component_suffix)
3524            })
3525            .filter(|id| !id.is_empty())
3526            .unwrap_or_else(|| "http-component".to_string()),
3527        RefKind::File(path) => path
3528            .file_stem()
3529            .and_then(|stem| stem.to_str())
3530            .map(strip_file_component_suffix)
3531            .filter(|id| !id.is_empty())
3532            .unwrap_or_else(|| "file-component".to_string()),
3533        RefKind::Oci(reference) => canonical_oci_component_id(reference),
3534        RefKind::Repo(reference) => reference.trim_start_matches("repo://").to_string(),
3535        RefKind::Store(reference) => reference.trim_start_matches("store://").to_string(),
3536        #[cfg(feature = "fixture-resolver")]
3537        RefKind::Fixture(reference) => reference.trim_start_matches('/').to_string(),
3538    }
3539}
3540
3541fn canonical_oci_component_id(reference: &str) -> String {
3542    let raw = reference.trim_start_matches("oci://");
3543    if let Ok(parsed) = Reference::try_from(raw) {
3544        return parsed.repository().to_string();
3545    }
3546    let without_digest = raw.split('@').next().unwrap_or(raw);
3547    let last_colon = without_digest.rfind(':');
3548    let last_slash = without_digest.rfind('/');
3549    if let (Some(colon), Some(slash)) = (last_colon, last_slash)
3550        && colon > slash
3551    {
3552        return without_digest[..colon].to_string();
3553    }
3554    without_digest.to_string()
3555}
3556
3557fn strip_file_component_suffix(input: &str) -> String {
3558    if let Some((prefix, suffix)) = input.rsplit_once("__")
3559        && !prefix.is_empty()
3560        && suffix.chars().all(|ch| ch.is_ascii_digit() || ch == '_')
3561        && suffix.contains('_')
3562    {
3563        return prefix.to_string();
3564    }
3565    input.to_string()
3566}
3567
3568fn normalize_content_type(current: Option<&str>, fallback: &str) -> String {
3569    let value = current.unwrap_or("").trim();
3570    if value.is_empty() {
3571        fallback.to_string()
3572    } else {
3573        value.to_string()
3574    }
3575}
3576
3577fn file_size_if_exists(path: &Path) -> Option<u64> {
3578    path.metadata().ok().map(|m| m.len())
3579}
3580
3581fn source_sidecar_describe_ref(source_wasm_path: &Path) -> Option<String> {
3582    let parent = source_wasm_path.parent()?;
3583    let describe = parent.join("describe.cbor");
3584    if !describe.exists() {
3585        return None;
3586    }
3587    Some(describe.display().to_string())
3588}
3589
3590fn resolve_component_id_from_cache(wasm_path: &Path, fallback: &str) -> String {
3591    let cache_dir = match wasm_path.parent() {
3592        Some(dir) => dir,
3593        None => return fallback.to_string(),
3594    };
3595    read_component_id_from_json(cache_dir.join("metadata.json"))
3596        .or_else(|| read_component_id_from_json(cache_dir.join("component.manifest.json")))
3597        .unwrap_or_else(|| fallback.to_string())
3598}
3599
3600fn read_component_id_from_json(path: PathBuf) -> Option<String> {
3601    let bytes = fs::read(path).ok()?;
3602    let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3603    extract_string_anywhere(
3604        &value,
3605        &["component_id", "componentId", "canonical_component_id"],
3606    )
3607    .filter(|id| !id.trim().is_empty())
3608}
3609
3610fn resolve_abi_version_from_cache(wasm_path: &Path) -> Option<String> {
3611    let cache_dir = wasm_path.parent()?;
3612    read_abi_version_from_json(cache_dir.join("metadata.json"))
3613        .or_else(|| read_abi_version_from_json(cache_dir.join("component.manifest.json")))
3614}
3615
3616fn resolve_describe_artifact_ref_from_cache(wasm_path: &Path) -> Option<String> {
3617    let cache_dir = wasm_path.parent()?;
3618    read_describe_artifact_ref_from_json(cache_dir.join("metadata.json"))
3619        .or_else(|| read_describe_artifact_ref_from_json(cache_dir.join("component.manifest.json")))
3620        .or_else(|| {
3621            let describe = cache_dir.join("describe.cbor");
3622            if describe.exists() {
3623                Some(describe.display().to_string())
3624            } else {
3625                None
3626            }
3627        })
3628}
3629
3630fn read_describe_artifact_ref_from_json(path: PathBuf) -> Option<String> {
3631    let bytes = fs::read(path).ok()?;
3632    let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3633    let found = extract_string_anywhere(
3634        &value,
3635        &[
3636            "describe_artifact_ref",
3637            "describeArtifactRef",
3638            "describe_ref",
3639            "describeRef",
3640        ],
3641    )?;
3642    let trimmed = found.trim();
3643    if trimmed.is_empty() {
3644        None
3645    } else {
3646        Some(trimmed.to_string())
3647    }
3648}
3649
3650fn read_abi_version_from_json(path: PathBuf) -> Option<String> {
3651    let bytes = fs::read(path).ok()?;
3652    let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3653    let found = extract_string_anywhere(&value, &["abi_version", "abiVersion", "abi"])?;
3654    normalize_abi_version(&found)
3655}
3656
3657fn normalize_abi_version(input: &str) -> Option<String> {
3658    let candidate = input.trim();
3659    if candidate.is_empty() {
3660        return None;
3661    }
3662    match semver::Version::parse(candidate) {
3663        Ok(version) => Some(version.to_string()),
3664        Err(_) => None,
3665    }
3666}
3667
3668fn extract_string_anywhere(value: &serde_json::Value, keys: &[&str]) -> Option<String> {
3669    match value {
3670        serde_json::Value::Object(map) => {
3671            for key in keys {
3672                if let Some(serde_json::Value::String(found)) = map.get(*key) {
3673                    return Some(found.clone());
3674                }
3675            }
3676            for nested in map.values() {
3677                if let Some(found) = extract_string_anywhere(nested, keys) {
3678                    return Some(found);
3679                }
3680            }
3681            None
3682        }
3683        serde_json::Value::Array(items) => {
3684            for item in items {
3685                if let Some(found) = extract_string_anywhere(item, keys) {
3686                    return Some(found);
3687                }
3688            }
3689            None
3690        }
3691        _ => None,
3692    }
3693}
3694
3695fn map_registry_target(target: &str, base: Option<&str>) -> Option<String> {
3696    if Reference::try_from(target).is_ok() {
3697        return Some(target.to_string());
3698    }
3699    let base = base?;
3700    let normalized_base = base.trim_end_matches('/');
3701    let normalized_target = target.trim_start_matches('/');
3702    Some(format!("{normalized_base}/{normalized_target}"))
3703}
3704
3705#[derive(Clone, Debug, PartialEq, Eq)]
3706struct GreenticBizStoreTarget {
3707    tenant: String,
3708    mapped_reference: String,
3709}
3710
3711fn is_greentic_biz_store_target(target: &str) -> bool {
3712    target == "greentic-biz" || target.starts_with("greentic-biz/")
3713}
3714
3715fn parse_greentic_biz_store_target(target: &str) -> Result<GreenticBizStoreTarget, DistError> {
3716    let remainder = target.strip_prefix("greentic-biz/").ok_or_else(|| {
3717        DistError::InvalidInput(
3718            "store://greentic-biz refs must include `<tenant>/<package-path>`".into(),
3719        )
3720    })?;
3721    let (tenant, package_path) = remainder.split_once('/').ok_or_else(|| {
3722        DistError::InvalidInput(
3723            "store://greentic-biz refs must include `<tenant>/<package-path>`".into(),
3724        )
3725    })?;
3726    let tenant = tenant.trim();
3727    let package_path = package_path.trim_matches('/');
3728    if tenant.is_empty() || package_path.is_empty() {
3729        return Err(DistError::InvalidInput(
3730            "store://greentic-biz refs must include non-empty `<tenant>/<package-path>`".into(),
3731        ));
3732    }
3733    Ok(GreenticBizStoreTarget {
3734        tenant: tenant.to_string(),
3735        mapped_reference: format!("ghcr.io/greentic-biz/{package_path}"),
3736    })
3737}
3738
3739enum RefKind {
3740    Digest(String),
3741    Http(String),
3742    File(PathBuf),
3743    Oci(String),
3744    Repo(String),
3745    Store(String),
3746    #[cfg(feature = "fixture-resolver")]
3747    Fixture(String),
3748}
3749
3750fn classify_reference(input: &str) -> Result<RefKind, DistError> {
3751    if is_digest(input) {
3752        return Ok(RefKind::Digest(normalize_digest(input)));
3753    }
3754    if let Ok(url) = Url::parse(input) {
3755        match url.scheme() {
3756            "http" | "https" => return Ok(RefKind::Http(input.to_string())),
3757            "file" => {
3758                if let Ok(path) = url.to_file_path() {
3759                    return Ok(RefKind::File(path));
3760                }
3761            }
3762            "oci" => {
3763                let trimmed = input.trim_start_matches("oci://");
3764                return Ok(RefKind::Oci(trimmed.to_string()));
3765            }
3766            "repo" => {
3767                let trimmed = input.trim_start_matches("repo://");
3768                return Ok(RefKind::Repo(trimmed.to_string()));
3769            }
3770            "store" => {
3771                let trimmed = input.trim_start_matches("store://");
3772                return Ok(RefKind::Store(trimmed.to_string()));
3773            }
3774            #[cfg(feature = "fixture-resolver")]
3775            "fixture" => {
3776                let trimmed = input.trim_start_matches("fixture://");
3777                return Ok(RefKind::Fixture(trimmed.to_string()));
3778            }
3779            _ => {}
3780        }
3781    }
3782    let path = Path::new(input);
3783    if path.exists() {
3784        return Ok(RefKind::File(path.to_path_buf()));
3785    }
3786    if Reference::try_from(input).is_ok() {
3787        Ok(RefKind::Oci(input.to_string()))
3788    } else {
3789        Err(DistError::InvalidRef {
3790            reference: input.to_string(),
3791        })
3792    }
3793}
3794
3795fn is_digest(s: &str) -> bool {
3796    s.starts_with("sha256:") && s.len() == "sha256:".len() + 64
3797}
3798
3799fn is_loopback_http(url: &Url) -> bool {
3800    url.scheme() == "http" && matches!(url.host_str(), Some("localhost") | Some("127.0.0.1"))
3801}
3802
3803fn ensure_secure_http_url(url: &str, allow_loopback_local: bool) -> Result<Url, DistError> {
3804    let parsed = Url::parse(url).map_err(|_| DistError::InvalidRef {
3805        reference: url.to_string(),
3806    })?;
3807    if parsed.scheme() == "https" || (allow_loopback_local && is_loopback_http(&parsed)) {
3808        Ok(parsed)
3809    } else {
3810        Err(DistError::InsecureUrl {
3811            url: url.to_string(),
3812        })
3813    }
3814}
3815
3816#[derive(Debug, serde::Deserialize)]
3817struct LockFile {
3818    #[serde(default)]
3819    schema_version: Option<u64>,
3820    #[serde(default)]
3821    components: Vec<LockEntry>,
3822}
3823
3824#[derive(Debug, serde::Deserialize)]
3825#[serde(untagged)]
3826enum LockEntry {
3827    String(String),
3828    Object(LockComponent),
3829}
3830
3831#[derive(Debug, serde::Deserialize)]
3832#[allow(dead_code)]
3833struct LockComponent {
3834    reference: Option<String>,
3835    #[serde(rename = "ref")]
3836    ref_field: Option<String>,
3837    digest: Option<String>,
3838    name: Option<String>,
3839}
3840
3841impl LockEntry {
3842    fn to_resolved(&self) -> LockResolvedEntry {
3843        match self {
3844            LockEntry::String(s) => LockResolvedEntry {
3845                reference: Some(s.clone()),
3846                digest: None,
3847            },
3848            LockEntry::Object(obj) => LockResolvedEntry {
3849                reference: obj
3850                    .reference
3851                    .clone()
3852                    .or_else(|| obj.ref_field.clone())
3853                    .or_else(|| obj.digest.clone()),
3854                digest: obj.digest.clone(),
3855            },
3856        }
3857    }
3858}
3859
3860#[derive(Clone, Debug)]
3861struct LockResolvedEntry {
3862    reference: Option<String>,
3863    digest: Option<String>,
3864}
3865
3866fn parse_lockfile(data: &str) -> Result<Vec<LockResolvedEntry>, serde_json::Error> {
3867    if let Ok(entries) = serde_json::from_str::<Vec<LockEntry>>(data) {
3868        return Ok(entries.into_iter().map(|e| e.to_resolved()).collect());
3869    }
3870    let parsed: LockFile = serde_json::from_str(data)?;
3871    let _ = parsed.schema_version;
3872    Ok(parsed
3873        .components
3874        .into_iter()
3875        .map(|c| c.to_resolved())
3876        .collect())
3877}
3878
3879#[cfg(test)]
3880mod tests {
3881    use super::*;
3882
3883    #[test]
3884    fn component_id_prefers_metadata_then_manifest_then_fallback() {
3885        let temp = tempfile::tempdir().unwrap();
3886        let cache_dir = temp.path().join("cache");
3887        fs::create_dir_all(&cache_dir).unwrap();
3888        let wasm = cache_dir.join("component.wasm");
3889        fs::write(&wasm, b"wasm").unwrap();
3890
3891        let fallback = "repo/name";
3892        assert_eq!(resolve_component_id_from_cache(&wasm, fallback), fallback);
3893
3894        fs::write(
3895            cache_dir.join("component.manifest.json"),
3896            r#"{"component_id":"from-manifest"}"#,
3897        )
3898        .unwrap();
3899        assert_eq!(
3900            resolve_component_id_from_cache(&wasm, fallback),
3901            "from-manifest"
3902        );
3903
3904        fs::write(
3905            cache_dir.join("metadata.json"),
3906            r#"{"component_id":"from-metadata"}"#,
3907        )
3908        .unwrap();
3909        assert_eq!(
3910            resolve_component_id_from_cache(&wasm, fallback),
3911            "from-metadata"
3912        );
3913    }
3914
3915    #[test]
3916    fn abi_version_is_best_effort_from_metadata_or_manifest() {
3917        let temp = tempfile::tempdir().unwrap();
3918        let cache_dir = temp.path().join("cache");
3919        fs::create_dir_all(&cache_dir).unwrap();
3920        let wasm = cache_dir.join("component.wasm");
3921        fs::write(&wasm, b"wasm").unwrap();
3922
3923        assert_eq!(resolve_abi_version_from_cache(&wasm), None);
3924
3925        fs::write(
3926            cache_dir.join("component.manifest.json"),
3927            r#"{"abi_version":"0.6.0"}"#,
3928        )
3929        .unwrap();
3930        assert_eq!(
3931            resolve_abi_version_from_cache(&wasm),
3932            Some("0.6.0".to_string())
3933        );
3934
3935        fs::write(cache_dir.join("metadata.json"), r#"{"abi":"not-semver"}"#).unwrap();
3936        assert_eq!(
3937            resolve_abi_version_from_cache(&wasm),
3938            Some("0.6.0".to_string())
3939        );
3940
3941        fs::write(cache_dir.join("metadata.json"), r#"{"abiVersion":"1.2.3"}"#).unwrap();
3942        assert_eq!(
3943            resolve_abi_version_from_cache(&wasm),
3944            Some("1.2.3".to_string())
3945        );
3946    }
3947}