1use crate::oci_components::{ComponentResolveOptions, DefaultRegistryClient, OciComponentResolver};
2use crate::store_auth::{
3 StoreCredentials, default_store_auth_path, default_store_state_path, load_login,
4};
5use async_trait::async_trait;
6use oci_distribution::Reference;
7use reqwest::Url;
8use serde::{Deserialize, Serialize};
9use sha2::{Digest, Sha256};
10use std::cell::OnceCell;
11use std::fs;
12use std::path::{Path, PathBuf};
13use std::sync::Arc;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::time::{SystemTime, UNIX_EPOCH};
16use thiserror::Error;
17
18const WASM_CONTENT_TYPE: &str = "application/wasm";
19static LAST_USED_COUNTER: AtomicU64 = AtomicU64::new(1);
20
21const CACHE_FORMAT_VERSION: u32 = 1;
22
23#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
24pub enum ArtifactSourceKind {
25 Oci,
26 Https,
27 File,
28 Fixture,
29 Repo,
30 Store,
31 CacheDigest,
32}
33
34#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
35pub struct TransportHints {
36 pub offline: bool,
37 pub allow_insecure_local_http: bool,
38}
39
40#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
41pub struct ArtifactSource {
42 pub raw_ref: String,
43 pub kind: ArtifactSourceKind,
44 pub transport_hints: TransportHints,
45 pub dev_mode: bool,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
49pub enum ArtifactType {
50 Bundle,
51 Pack,
52 Component,
53}
54
55#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
56pub enum ResolvedVia {
57 Direct,
58 TagResolution,
59 RepoMapping,
60 StoreMapping,
61 Fixture,
62 File,
63 Https,
64 CacheDigest,
65}
66
67#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
68pub struct ArtifactDescriptor {
69 pub artifact_type: ArtifactType,
70 pub source_kind: ArtifactSourceKind,
71 pub raw_ref: String,
72 pub canonical_ref: String,
73 pub digest: String,
74 pub media_type: String,
75 pub size_bytes: u64,
76 pub created_at: Option<u64>,
77 pub annotations: serde_json::Map<String, serde_json::Value>,
78 pub manifest_digest: Option<String>,
79 pub resolved_via: ResolvedVia,
80 pub signature_refs: Vec<String>,
81 pub sbom_refs: Vec<String>,
82}
83
84impl ArtifactDescriptor {
85 pub fn cache_key(&self) -> String {
86 self.digest.clone()
87 }
88}
89
90#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
91pub enum IntegrityState {
92 Partial,
93 Ready,
94 Corrupt,
95 Evicted,
96}
97
98#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
99pub struct SourceSnapshot {
100 pub raw_ref: String,
101 pub canonical_ref: String,
102 pub source_kind: ArtifactSourceKind,
103 pub authoritative: bool,
104}
105
106#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
107pub enum CacheEntryState {
108 Partial,
109 Ready,
110 Corrupt,
111 Evicted,
112}
113
114#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
115pub struct CacheEntry {
116 pub format_version: u32,
117 pub cache_key: String,
118 pub digest: String,
119 pub media_type: String,
120 pub size_bytes: u64,
121 pub artifact_type: ArtifactType,
122 pub source_kind: ArtifactSourceKind,
123 pub raw_ref: String,
124 pub canonical_ref: String,
125 pub fetched_at: u64,
126 pub last_accessed_at: u64,
127 pub last_verified_at: Option<u64>,
128 pub state: CacheEntryState,
129 pub advisory_epoch: Option<u64>,
130 pub signature_summary: Option<serde_json::Value>,
131 pub local_path: PathBuf,
132 pub source_snapshot: SourceSnapshot,
133}
134
135#[derive(Clone, Debug, Default, PartialEq, Eq)]
136pub struct ResolvePolicy;
137
138#[derive(Clone, Debug, Default, PartialEq, Eq)]
139pub struct CachePolicy;
140
141#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
142pub struct RetentionReport {
143 pub scanned_entries: usize,
144 pub kept: usize,
145 pub evicted: usize,
146 pub protected: usize,
147 pub bytes_reclaimed: u64,
148 pub refusals: Vec<String>,
149}
150
151#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
152pub enum RetentionEnvironment {
153 Dev,
154 Staging,
155 Prod,
156}
157
158#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
159pub struct RetentionInput {
160 pub entries: Vec<CacheEntry>,
161 pub active_bundle_ids: Vec<String>,
162 pub staged_bundle_ids: Vec<String>,
163 pub warming_bundle_ids: Vec<String>,
164 pub ready_bundle_ids: Vec<String>,
165 pub draining_bundle_ids: Vec<String>,
166 pub session_referenced_bundle_ids: Vec<String>,
167 pub max_cache_bytes: u64,
168 pub max_entry_age: Option<u64>,
169 pub minimum_rollback_depth: usize,
170 pub environment: RetentionEnvironment,
171}
172
173#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
174pub enum RetentionDisposition {
175 Keep,
176 Evict,
177 Protect,
178}
179
180#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
181pub struct RetentionDecision {
182 pub cache_key: String,
183 pub bundle_id: String,
184 pub decision: RetentionDisposition,
185 pub reason_code: String,
186 pub reason_detail: String,
187}
188
189#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
190pub struct RetentionOutcome {
191 pub decisions: Vec<RetentionDecision>,
192 pub report: RetentionReport,
193}
194
195#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
196pub enum VerificationEnvironment {
197 Dev,
198 Staging,
199 Prod,
200}
201
202#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
203pub struct VerificationPolicy {
204 pub require_signature: bool,
205 pub trusted_issuers: Vec<String>,
206 pub deny_issuers: Vec<String>,
207 pub deny_digests: Vec<String>,
208 pub allowed_media_types: Vec<String>,
209 pub require_sbom: bool,
210 pub minimum_operator_version: Option<String>,
211 pub environment: VerificationEnvironment,
212}
213
214impl Default for VerificationPolicy {
215 fn default() -> Self {
216 Self {
217 require_signature: false,
218 trusted_issuers: Vec::new(),
219 deny_issuers: Vec::new(),
220 deny_digests: Vec::new(),
221 allowed_media_types: Vec::new(),
222 require_sbom: false,
223 minimum_operator_version: None,
224 environment: VerificationEnvironment::Dev,
225 }
226 }
227}
228
229#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
230pub struct AdvisorySet {
231 pub version: String,
232 pub issued_at: u64,
233 pub source: String,
234 pub deny_digests: Vec<String>,
235 pub deny_issuers: Vec<String>,
236 pub minimum_operator_version: Option<String>,
237 pub release_train: Option<ReleaseTrainDescriptor>,
238 pub expires_at: Option<u64>,
239 pub next_refresh_hint: Option<u64>,
240}
241
242#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
243pub struct ReleaseTrainDescriptor {
244 pub train_id: String,
245 pub operator_digest: Option<String>,
246 pub bundle_digests: Vec<String>,
247 pub required_extension_digests: Vec<String>,
248 pub baseline_observer_digest: Option<String>,
249}
250
251#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
252pub enum VerificationOutcome {
253 Passed,
254 Failed,
255 Warning,
256 Skipped,
257}
258
259#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
260pub struct VerificationCheck {
261 pub name: String,
262 pub outcome: VerificationOutcome,
263 pub detail: String,
264 pub payload: Option<serde_json::Value>,
265}
266
267#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
268pub struct VerificationReport {
269 pub artifact_digest: String,
270 pub canonical_ref: String,
271 pub checks: Vec<VerificationCheck>,
272 pub passed: bool,
273 pub warnings: Vec<String>,
274 pub errors: Vec<String>,
275 pub policy_fingerprint: String,
276 pub advisory_version: Option<String>,
277 pub cache_entry_fingerprint: Option<String>,
278}
279
280#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
281pub struct PreliminaryDecision {
282 pub passed: bool,
283 pub checks: Vec<VerificationCheck>,
284 pub warnings: Vec<String>,
285 pub errors: Vec<String>,
286}
287
288#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
289pub enum AccessMode {
290 Userspace,
291 Mount,
292}
293
294#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
295pub enum BundleOpenMode {
296 CacheReuse,
297 Userspace,
298 Mount,
299}
300
301#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
302pub struct StageBundleInput {
303 pub bundle_ref: String,
304 pub requested_access_mode: AccessMode,
305 pub verification_policy_ref: String,
306 pub cache_policy_ref: String,
307 pub tenant: Option<String>,
308 pub team: Option<String>,
309}
310
311#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
312pub struct WarmBundleInput {
313 pub bundle_id: String,
314 pub cache_key: String,
315 pub smoke_test: bool,
316 pub dry_run: bool,
317 pub expected_operator_version: Option<String>,
318}
319
320#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
321pub struct RollbackBundleInput {
322 pub target_bundle_id: String,
323 pub expected_cache_key: Option<String>,
324}
325
326#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
327pub struct BundleManifestSummary {
328 pub component_id: String,
329 pub abi_version: Option<String>,
330 pub describe_artifact_ref: Option<String>,
331 pub artifact_type: ArtifactType,
332 pub media_type: String,
333 pub size_bytes: u64,
334}
335
336#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
337pub struct StageAuditFields {
338 pub staged_at: u64,
339 pub requested_access_mode: AccessMode,
340 pub verification_policy_ref: String,
341 pub cache_policy_ref: String,
342 pub tenant: Option<String>,
343 pub team: Option<String>,
344}
345
346#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
347pub struct WarmAuditFields {
348 pub warmed_at: u64,
349 pub smoke_test: bool,
350 pub dry_run: bool,
351 pub reopened_from_cache: bool,
352}
353
354#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
355pub struct RollbackAuditFields {
356 pub rolled_back_at: u64,
357 pub reopened_from_cache: bool,
358 pub expected_cache_key: Option<String>,
359}
360
361#[derive(Debug)]
362pub struct StageBundleResult {
363 pub bundle_id: String,
364 pub canonical_ref: String,
365 pub descriptor: ArtifactDescriptor,
366 pub resolved_artifact: ResolvedArtifact,
367 pub verification_report: VerificationReport,
368 pub cache_entry: CacheEntry,
369 pub stage_audit_fields: StageAuditFields,
370}
371
372#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
373pub struct WarmBundleResult {
374 pub bundle_id: String,
375 pub verification_report: VerificationReport,
376 pub bundle_manifest_summary: BundleManifestSummary,
377 pub bundle_open_mode: BundleOpenMode,
378 pub warnings: Vec<String>,
379 pub errors: Vec<String>,
380 pub warm_audit_fields: WarmAuditFields,
381}
382
383#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
384pub struct RollbackBundleResult {
385 pub bundle_id: String,
386 pub reopened_from_cache: bool,
387 pub cache_entry: CacheEntry,
388 pub verification_report: VerificationReport,
389 pub rollback_audit_fields: RollbackAuditFields,
390}
391
392#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
393pub struct BundleRecord {
394 pub bundle_id: String,
395 pub cache_key: String,
396 pub canonical_ref: String,
397 pub source_kind: ArtifactSourceKind,
398 pub fetched_at: u64,
399 pub lifecycle_state: BundleLifecycleState,
400}
401
402#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
403pub enum BundleLifecycleState {
404 Inactive,
405 Staged,
406 Warming,
407 Ready,
408 Draining,
409}
410
411#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
412pub struct ArtifactOpenRequest {
413 pub bundle_id: String,
414 pub dry_run: bool,
415 pub smoke_test: bool,
416}
417
418#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
419pub struct ArtifactOpenOutput {
420 pub bundle_manifest_summary: BundleManifestSummary,
421 pub bundle_open_mode: BundleOpenMode,
422 pub warnings: Vec<String>,
423}
424
425#[deprecated(note = "use ArtifactOpenRequest instead")]
426pub type BundleOpenRequest = ArtifactOpenRequest;
427
428#[deprecated(note = "use ArtifactOpenOutput instead")]
429pub type BundleOpenOutput = ArtifactOpenOutput;
430
431#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
432pub enum IntegrationErrorCode {
433 InvalidReference,
434 UnsupportedSource,
435 ResolutionFailed,
436 DownloadFailed,
437 ResolutionUnavailable,
438 DigestMismatch,
439 MediaTypeRejected,
440 IssuerRejected,
441 DigestDenied,
442 SignatureRequired,
443 CacheCorrupt,
444 CacheMiss,
445 OfflineRequiredButUnavailable,
446 UnsupportedArtifactType,
447 DescriptorCorrupt,
448 PolicyInputInvalid,
449 AdvisoryRejected,
450 VerificationFailed,
451 BundleOpenFailed,
452}
453
454#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
455pub struct IntegrationError {
456 pub code: IntegrationErrorCode,
457 pub summary: String,
458 pub retryable: bool,
459 pub details: Option<serde_json::Value>,
460}
461
462pub trait ArtifactOpener: Send + Sync {
463 fn open(
464 &self,
465 artifact: &ResolvedArtifact,
466 request: &ArtifactOpenRequest,
467 ) -> Result<ArtifactOpenOutput, IntegrationError>;
468}
469
470#[deprecated(
471 note = "use ArtifactOpener instead; format ownership belongs outside distributor-client"
472)]
473pub trait BundleOpener: Send + Sync {
474 fn open(
475 &self,
476 artifact: &ResolvedArtifact,
477 request: &ArtifactOpenRequest,
478 ) -> Result<ArtifactOpenOutput, IntegrationError>;
479}
480
481#[allow(deprecated)]
482impl<T: BundleOpener + ?Sized> ArtifactOpener for T {
483 fn open(
484 &self,
485 artifact: &ResolvedArtifact,
486 request: &ArtifactOpenRequest,
487 ) -> Result<ArtifactOpenOutput, IntegrationError> {
488 BundleOpener::open(self, artifact, request)
489 }
490}
491
492#[derive(Clone, Debug)]
493pub struct DistOptions {
494 pub cache_dir: PathBuf,
495 pub allow_tags: bool,
496 pub offline: bool,
497 pub allow_insecure_local_http: bool,
498 pub cache_max_bytes: u64,
499 pub repo_registry_base: Option<String>,
500 pub store_registry_base: Option<String>,
501 pub store_auth_path: PathBuf,
502 pub store_state_path: PathBuf,
503 #[cfg(feature = "fixture-resolver")]
504 pub fixture_dir: Option<PathBuf>,
505}
506
507impl Default for DistOptions {
508 fn default() -> Self {
509 let offline = std::env::var("GREENTIC_DIST_OFFLINE").is_ok_and(|v| v == "1");
510 let allow_insecure_local_http =
511 std::env::var("GREENTIC_DIST_ALLOW_INSECURE_LOCAL_HTTP").is_ok_and(|v| v == "1");
512 let cache_max_bytes = std::env::var("GREENTIC_CACHE_MAX_BYTES")
513 .ok()
514 .and_then(|v| v.parse::<u64>().ok())
515 .unwrap_or(3 * 1024 * 1024 * 1024);
516 let cache_dir = std::env::var("GREENTIC_CACHE_DIR")
517 .or_else(|_| std::env::var("GREENTIC_DIST_CACHE_DIR"))
518 .map(PathBuf::from)
519 .unwrap_or_else(|_| default_distribution_cache_root());
520 Self {
521 cache_dir,
522 allow_tags: true,
523 offline,
524 allow_insecure_local_http,
525 cache_max_bytes,
526 repo_registry_base: std::env::var("GREENTIC_REPO_REGISTRY_BASE").ok(),
527 store_registry_base: std::env::var("GREENTIC_STORE_REGISTRY_BASE").ok(),
528 store_auth_path: default_store_auth_path(),
529 store_state_path: default_store_state_path(),
530 #[cfg(feature = "fixture-resolver")]
531 fixture_dir: std::env::var("GREENTIC_FIXTURE_DIR")
532 .ok()
533 .map(PathBuf::from),
534 }
535 }
536}
537
538#[derive(Debug)]
539pub struct ResolvedArtifact {
540 pub descriptor: ArtifactDescriptor,
541 pub cache_key: String,
542 pub local_path: PathBuf,
543 pub fetched_at: u64,
544 pub integrity_state: IntegrityState,
545 pub source_snapshot: SourceSnapshot,
546 pub resolved_digest: String,
547 pub wasm_bytes: Option<Vec<u8>>,
548 pub wasm_path: Option<PathBuf>,
549 pub component_id: String,
550 pub abi_version: Option<String>,
551 pub describe_artifact_ref: Option<String>,
552 pub content_length: Option<u64>,
553 pub content_type: Option<String>,
554 pub fetched: bool,
555 pub source: ArtifactSource,
556 pub digest: String,
557 pub cache_path: Option<PathBuf>,
558 loaded_wasm_bytes: OnceCell<Vec<u8>>,
559}
560
561impl ResolvedArtifact {
562 #[allow(clippy::too_many_arguments)]
563 fn from_path(
564 resolved_digest: String,
565 wasm_path: PathBuf,
566 component_id: String,
567 abi_version: Option<String>,
568 describe_artifact_ref: Option<String>,
569 content_length: Option<u64>,
570 content_type: Option<String>,
571 fetched: bool,
572 source: LegacyArtifactSource,
573 ) -> Self {
574 let source_kind = source.kind();
575 let canonical_ref = source.canonical_ref(&resolved_digest);
576 let fetched_at = unix_now();
577 let public_source = artifact_source_from_legacy(&source);
578 let descriptor = ArtifactDescriptor {
579 artifact_type: ArtifactType::Component,
580 source_kind: source_kind.clone(),
581 raw_ref: public_source.raw_ref.clone(),
582 canonical_ref: canonical_ref.clone(),
583 digest: resolved_digest.clone(),
584 media_type: content_type
585 .clone()
586 .unwrap_or_else(|| WASM_CONTENT_TYPE.to_string()),
587 size_bytes: content_length.unwrap_or_default(),
588 created_at: None,
589 annotations: serde_json::Map::new(),
590 manifest_digest: None,
591 resolved_via: source.resolved_via(),
592 signature_refs: Vec::new(),
593 sbom_refs: Vec::new(),
594 };
595 let source_snapshot = SourceSnapshot {
596 raw_ref: descriptor.raw_ref.clone(),
597 canonical_ref: descriptor.canonical_ref.clone(),
598 source_kind: descriptor.source_kind.clone(),
599 authoritative: descriptor.raw_ref == descriptor.canonical_ref,
600 };
601 Self {
602 descriptor,
603 cache_key: resolved_digest.clone(),
604 local_path: wasm_path.clone(),
605 fetched_at,
606 integrity_state: IntegrityState::Ready,
607 source_snapshot,
608 digest: resolved_digest.clone(),
609 cache_path: Some(wasm_path.clone()),
610 resolved_digest,
611 wasm_bytes: None,
612 wasm_path: Some(wasm_path),
613 component_id,
614 abi_version,
615 describe_artifact_ref,
616 content_length,
617 content_type,
618 fetched,
619 source: public_source,
620 loaded_wasm_bytes: OnceCell::new(),
621 }
622 }
623
624 pub fn validate_payload(&self) -> Result<(), DistError> {
625 let has_bytes = self.wasm_bytes.is_some();
626 let has_path = self.wasm_path.is_some();
627 if has_bytes == has_path {
628 return Err(DistError::CorruptArtifact {
629 reference: self.resolved_digest.clone(),
630 reason: "expected exactly one of wasm_bytes or wasm_path".into(),
631 });
632 }
633 Ok(())
634 }
635
636 pub fn wasm_bytes(&self) -> Result<&[u8], DistError> {
637 self.validate_payload()?;
638 if let Some(bytes) = self.wasm_bytes.as_deref() {
639 return Ok(bytes);
640 }
641 let path = self
642 .wasm_path
643 .as_ref()
644 .ok_or_else(|| DistError::CorruptArtifact {
645 reference: self.resolved_digest.clone(),
646 reason: "missing wasm path".into(),
647 })?;
648 if self.loaded_wasm_bytes.get().is_none() {
649 let loaded = fs::read(path).map_err(|source| DistError::CacheError {
650 path: path.display().to_string(),
651 source,
652 })?;
653 let _ = self.loaded_wasm_bytes.set(loaded);
654 }
655 Ok(self
656 .loaded_wasm_bytes
657 .get()
658 .expect("loaded_wasm_bytes must be set")
659 .as_slice())
660 }
661
662 pub fn lock_hint(&self, source_ref: impl Into<String>) -> LockHint {
663 LockHint {
664 source_ref: source_ref.into(),
665 resolved_digest: self.resolved_digest.clone(),
666 content_length: self.content_length,
667 content_type: self.content_type.clone(),
668 abi_version: self.abi_version.clone(),
669 component_id: self.component_id.clone(),
670 }
671 }
672}
673
674#[derive(Clone, Debug, PartialEq, Eq)]
675pub struct LockHint {
676 pub source_ref: String,
677 pub resolved_digest: String,
678 pub content_length: Option<u64>,
679 pub content_type: Option<String>,
680 pub abi_version: Option<String>,
681 pub component_id: String,
682}
683
684#[derive(Clone, Debug)]
685enum LegacyArtifactSource {
686 Digest,
687 Http(String),
688 File(PathBuf),
689 Oci(String),
690 Repo(String),
691 Store(String),
692}
693
694impl LegacyArtifactSource {
695 fn kind(&self) -> ArtifactSourceKind {
696 match self {
697 Self::Digest => ArtifactSourceKind::CacheDigest,
698 Self::Http(_) => ArtifactSourceKind::Https,
699 Self::File(_) => ArtifactSourceKind::File,
700 Self::Oci(_) => ArtifactSourceKind::Oci,
701 Self::Repo(_) => ArtifactSourceKind::Repo,
702 Self::Store(_) => ArtifactSourceKind::Store,
703 }
704 }
705
706 fn raw_ref(&self) -> String {
707 match self {
708 Self::Digest => String::new(),
709 Self::Http(url) => url.clone(),
710 Self::File(path) => path.display().to_string(),
711 Self::Oci(reference) => format!("oci://{reference}"),
712 Self::Repo(reference) => reference.clone(),
713 Self::Store(reference) => reference.clone(),
714 }
715 }
716
717 fn canonical_ref(&self, digest: &str) -> String {
718 match self {
719 Self::Digest => digest.to_string(),
720 Self::Http(url) => format!("{url}@{digest}"),
721 Self::File(path) => format!("file://{}@{digest}", path.display()),
722 Self::Oci(reference) => canonical_oci_ref(reference, digest),
723 Self::Repo(reference) | Self::Store(reference) => {
724 let raw = reference
725 .trim_start_matches("repo://")
726 .trim_start_matches("store://");
727 canonical_oci_ref(raw, digest)
728 }
729 }
730 }
731
732 fn resolved_via(&self) -> ResolvedVia {
733 match self {
734 Self::Digest => ResolvedVia::CacheDigest,
735 Self::Http(_) => ResolvedVia::Https,
736 Self::File(_) => ResolvedVia::File,
737 Self::Oci(reference) => {
738 if reference.contains(':') && !reference.contains("@sha256:") {
739 ResolvedVia::TagResolution
740 } else {
741 ResolvedVia::Direct
742 }
743 }
744 Self::Repo(_) => ResolvedVia::RepoMapping,
745 Self::Store(_) => ResolvedVia::StoreMapping,
746 }
747 }
748}
749
750fn artifact_source_from_legacy(source: &LegacyArtifactSource) -> ArtifactSource {
751 ArtifactSource {
752 raw_ref: source.raw_ref(),
753 kind: source.kind(),
754 transport_hints: TransportHints::default(),
755 dev_mode: matches!(
756 source.kind(),
757 ArtifactSourceKind::Fixture | ArtifactSourceKind::File
758 ),
759 }
760}
761
762fn legacy_source_from_public(source: &ArtifactSource) -> LegacyArtifactSource {
763 match source.kind {
764 ArtifactSourceKind::CacheDigest => LegacyArtifactSource::Digest,
765 ArtifactSourceKind::Https => LegacyArtifactSource::Http(source.raw_ref.clone()),
766 ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
767 LegacyArtifactSource::File(PathBuf::from(source.raw_ref.clone()))
768 }
769 ArtifactSourceKind::Oci => {
770 LegacyArtifactSource::Oci(source.raw_ref.trim_start_matches("oci://").to_string())
771 }
772 ArtifactSourceKind::Repo => LegacyArtifactSource::Repo(source.raw_ref.clone()),
773 ArtifactSourceKind::Store => LegacyArtifactSource::Store(source.raw_ref.clone()),
774 }
775}
776
777pub struct DistClient {
778 cache: ComponentCache,
779 oci: OciComponentResolver<DefaultRegistryClient>,
780 http: reqwest::Client,
781 opts: DistOptions,
782 injected: Option<Arc<dyn ResolveRefInjector>>,
783 artifact_opener: Arc<dyn ArtifactOpener>,
784}
785
786#[derive(Clone, Debug, Default)]
787struct DefaultArtifactOpener;
788
789#[derive(Clone, Debug)]
790pub struct OciCacheInspection {
791 pub digest: String,
792 pub cache_dir: PathBuf,
793 pub selected_media_type: String,
794 pub fetched: bool,
795}
796
797#[derive(Clone, Debug)]
798#[deprecated(note = "use ArtifactSource with DistClient::resolve/fetch instead")]
799pub struct ResolveRefRequest {
800 pub reference: String,
801}
802
803#[derive(Clone, Debug, Default)]
804#[deprecated(note = "use ArtifactSource with DistClient::resolve/fetch instead")]
805pub struct ResolveComponentRequest {
806 pub reference: String,
807 pub tenant: Option<String>,
808 pub pack: Option<String>,
809 pub environment: Option<String>,
810}
811
812#[derive(Clone, Debug)]
813pub enum InjectedResolution {
814 Redirect(String),
815 WasmBytes {
816 resolved_digest: String,
817 wasm_bytes: Vec<u8>,
818 component_id: String,
819 abi_version: Option<String>,
820 source: ArtifactSource,
821 },
822 WasmPath {
823 resolved_digest: String,
824 wasm_path: PathBuf,
825 component_id: String,
826 abi_version: Option<String>,
827 source: ArtifactSource,
828 },
829}
830
831#[async_trait]
832pub trait ResolveRefInjector: Send + Sync {
833 async fn resolve(&self, reference: &str) -> Result<Option<InjectedResolution>, DistError>;
834}
835
836impl DistClient {
837 pub fn new(opts: DistOptions) -> Self {
838 Self::with_parts(opts, None, Arc::new(DefaultArtifactOpener))
839 }
840
841 pub fn with_ref_injector(opts: DistOptions, injector: Arc<dyn ResolveRefInjector>) -> Self {
842 Self::with_parts(opts, Some(injector), Arc::new(DefaultArtifactOpener))
843 }
844
845 pub fn with_artifact_opener(
846 opts: DistOptions,
847 artifact_opener: Arc<dyn ArtifactOpener>,
848 ) -> Self {
849 Self::with_parts(opts, None, artifact_opener)
850 }
851
852 #[allow(deprecated)]
853 #[deprecated(note = "use with_artifact_opener instead")]
854 pub fn with_bundle_opener<T: BundleOpener + 'static>(
855 opts: DistOptions,
856 bundle_opener: Arc<T>,
857 ) -> Self {
858 Self::with_artifact_opener(opts, bundle_opener)
859 }
860
861 fn with_parts(
862 opts: DistOptions,
863 injected: Option<Arc<dyn ResolveRefInjector>>,
864 artifact_opener: Arc<dyn ArtifactOpener>,
865 ) -> Self {
866 let oci_opts = ComponentResolveOptions {
867 allow_tags: opts.allow_tags,
868 offline: opts.offline,
869 cache_dir: opts.cache_dir.join("legacy-components"),
870 ..Default::default()
871 };
872 let http = reqwest::Client::builder()
873 .no_proxy()
874 .build()
875 .expect("failed to build http client");
876 Self {
877 cache: ComponentCache::new(opts.cache_dir.clone()),
878 oci: OciComponentResolver::new(oci_opts),
879 http,
880 opts,
881 injected,
882 artifact_opener,
883 }
884 }
885
886 async fn resolve_descriptor_from_reference(
887 &self,
888 reference: &str,
889 ) -> Result<ArtifactDescriptor, DistError> {
890 match classify_reference(reference)? {
891 RefKind::Digest(digest) => {
892 let entry = self.stat_cache(&digest)?;
893 Ok(descriptor_from_entry(&entry))
894 }
895 RefKind::Http(url) => {
896 if self.opts.offline {
897 return Err(DistError::Offline { reference: url });
898 }
899 let normalized = ensure_secure_http_url(&url, self.opts.allow_insecure_local_http)?;
900 Ok(ArtifactDescriptor {
901 artifact_type: ArtifactType::Component,
902 source_kind: ArtifactSourceKind::Https,
903 raw_ref: normalized.to_string(),
904 canonical_ref: normalized.to_string(),
905 digest: String::new(),
906 media_type: WASM_CONTENT_TYPE.to_string(),
907 size_bytes: 0,
908 created_at: None,
909 annotations: serde_json::Map::new(),
910 manifest_digest: None,
911 resolved_via: ResolvedVia::Https,
912 signature_refs: Vec::new(),
913 sbom_refs: Vec::new(),
914 })
915 }
916 RefKind::File(path) => {
917 let bytes = fs::read(&path).map_err(|source| DistError::CacheError {
918 path: path.display().to_string(),
919 source,
920 })?;
921 let digest = digest_for_bytes(&bytes);
922 Ok(ArtifactDescriptor {
923 artifact_type: ArtifactType::Component,
924 source_kind: ArtifactSourceKind::File,
925 raw_ref: path.display().to_string(),
926 canonical_ref: format!("file://{}@{}", path.display(), digest),
927 digest,
928 media_type: WASM_CONTENT_TYPE.to_string(),
929 size_bytes: bytes.len() as u64,
930 created_at: None,
931 annotations: serde_json::Map::new(),
932 manifest_digest: None,
933 resolved_via: ResolvedVia::File,
934 signature_refs: Vec::new(),
935 sbom_refs: Vec::new(),
936 })
937 }
938 RefKind::Oci(reference) => self.resolve_oci_descriptor(&reference).await,
939 RefKind::Repo(target) => {
940 if self.opts.repo_registry_base.is_none() {
941 return Err(DistError::ResolutionUnavailable {
942 reference: format!("repo://{target}"),
943 });
944 }
945 let mapped = map_registry_target(&target, self.opts.repo_registry_base.as_deref())
946 .ok_or_else(|| DistError::ResolutionUnavailable {
947 reference: format!("repo://{target}"),
948 })?;
949 let mut descriptor = self.resolve_oci_descriptor(&mapped).await?;
950 descriptor.source_kind = ArtifactSourceKind::Repo;
951 descriptor.raw_ref = format!("repo://{target}");
952 descriptor.resolved_via = ResolvedVia::RepoMapping;
953 Ok(descriptor)
954 }
955 RefKind::Store(target) => {
956 if is_greentic_biz_store_target(&target) {
957 return self.resolve_greentic_biz_store_descriptor(&target).await;
958 }
959 if self.opts.store_registry_base.is_none() {
960 return Err(DistError::ResolutionUnavailable {
961 reference: format!("store://{target}"),
962 });
963 }
964 let mapped = map_registry_target(&target, self.opts.store_registry_base.as_deref())
965 .ok_or_else(|| DistError::ResolutionUnavailable {
966 reference: format!("store://{target}"),
967 })?;
968 let mut descriptor = self.resolve_oci_descriptor(&mapped).await?;
969 descriptor.source_kind = ArtifactSourceKind::Store;
970 descriptor.raw_ref = format!("store://{target}");
971 descriptor.resolved_via = ResolvedVia::StoreMapping;
972 Ok(descriptor)
973 }
974 #[cfg(feature = "fixture-resolver")]
975 RefKind::Fixture(target) => {
976 let fixture_dir = self.opts.fixture_dir.as_ref().ok_or_else(|| {
977 DistError::InvalidInput("fixture:// requires fixture_dir".into())
978 })?;
979 let raw = target.trim_start_matches('/');
980 let candidate = if raw.ends_with(".wasm") {
981 fixture_dir.join(raw)
982 } else {
983 fixture_dir.join(format!("{raw}.wasm"))
984 };
985 let bytes = fs::read(&candidate).map_err(|source| DistError::CacheError {
986 path: candidate.display().to_string(),
987 source,
988 })?;
989 let digest = digest_for_bytes(&bytes);
990 Ok(ArtifactDescriptor {
991 artifact_type: ArtifactType::Component,
992 source_kind: ArtifactSourceKind::Fixture,
993 raw_ref: format!("fixture://{target}"),
994 canonical_ref: format!("file://{}@{}", candidate.display(), digest),
995 digest,
996 media_type: WASM_CONTENT_TYPE.to_string(),
997 size_bytes: bytes.len() as u64,
998 created_at: None,
999 annotations: serde_json::Map::new(),
1000 manifest_digest: None,
1001 resolved_via: ResolvedVia::Fixture,
1002 signature_refs: Vec::new(),
1003 sbom_refs: Vec::new(),
1004 })
1005 }
1006 }
1007 }
1008
1009 async fn resolve_oci_descriptor(
1010 &self,
1011 reference: &str,
1012 ) -> Result<ArtifactDescriptor, DistError> {
1013 self.resolve_oci_descriptor_with_client(reference, DefaultRegistryClient::default())
1014 .await
1015 }
1016
1017 async fn resolve_oci_descriptor_with_client(
1018 &self,
1019 reference: &str,
1020 client: DefaultRegistryClient,
1021 ) -> Result<ArtifactDescriptor, DistError> {
1022 let resolver = OciComponentResolver::with_client(client, self.oci_resolve_options());
1023 let resolved = resolver
1024 .resolve_descriptor(reference)
1025 .await
1026 .map_err(DistError::Oci)?;
1027 Ok(ArtifactDescriptor {
1028 artifact_type: ArtifactType::Component,
1029 source_kind: ArtifactSourceKind::Oci,
1030 raw_ref: format!("oci://{reference}"),
1031 canonical_ref: canonical_oci_ref(reference, &resolved.resolved_digest),
1032 digest: resolved.resolved_digest,
1033 media_type: normalize_content_type(Some(&resolved.media_type), WASM_CONTENT_TYPE),
1034 size_bytes: resolved.size_bytes,
1035 created_at: None,
1036 annotations: serde_json::Map::new(),
1037 manifest_digest: resolved.manifest_digest,
1038 resolved_via: if reference.contains(':') && !reference.contains("@sha256:") {
1039 ResolvedVia::TagResolution
1040 } else {
1041 ResolvedVia::Direct
1042 },
1043 signature_refs: Vec::new(),
1044 sbom_refs: Vec::new(),
1045 })
1046 }
1047
1048 fn oci_resolve_options(&self) -> ComponentResolveOptions {
1049 ComponentResolveOptions {
1050 allow_tags: self.opts.allow_tags,
1051 offline: self.opts.offline,
1052 cache_dir: self.opts.cache_dir.join("legacy-components"),
1053 ..Default::default()
1054 }
1055 }
1056
1057 async fn load_store_credentials(&self, tenant: &str) -> Result<StoreCredentials, DistError> {
1058 load_login(
1059 &self.opts.store_auth_path,
1060 &self.opts.store_state_path,
1061 tenant,
1062 )
1063 .await
1064 .map_err(|err| DistError::StoreAuth(err.to_string()))
1065 }
1066
1067 async fn greentic_biz_store_client(
1068 &self,
1069 tenant: &str,
1070 ) -> Result<DefaultRegistryClient, DistError> {
1071 let credentials = self.load_store_credentials(tenant).await?;
1072 Ok(DefaultRegistryClient::with_basic_auth(
1073 credentials.username,
1074 credentials.token,
1075 ))
1076 }
1077
1078 async fn resolve_greentic_biz_store_descriptor(
1079 &self,
1080 target: &str,
1081 ) -> Result<ArtifactDescriptor, DistError> {
1082 let parsed = parse_greentic_biz_store_target(target)?;
1083 let client = self.greentic_biz_store_client(&parsed.tenant).await?;
1084 let mut descriptor = self
1085 .resolve_oci_descriptor_with_client(&parsed.mapped_reference, client)
1086 .await?;
1087 descriptor.source_kind = ArtifactSourceKind::Store;
1088 descriptor.raw_ref = format!("store://{target}");
1089 descriptor.resolved_via = ResolvedVia::StoreMapping;
1090 Ok(descriptor)
1091 }
1092
1093 async fn pull_oci_with_source_and_client(
1094 &self,
1095 reference: &str,
1096 source: LegacyArtifactSource,
1097 component_id: String,
1098 client: DefaultRegistryClient,
1099 ) -> Result<ResolvedArtifact, DistError> {
1100 if self.opts.offline {
1101 return Err(DistError::Offline {
1102 reference: reference.to_string(),
1103 });
1104 }
1105 let resolver = OciComponentResolver::with_client(client, self.oci_resolve_options());
1106 let result = resolver
1107 .resolve_refs(&crate::oci_components::ComponentsExtension {
1108 refs: vec![reference.to_string()],
1109 mode: crate::oci_components::ComponentsMode::Eager,
1110 })
1111 .await
1112 .map_err(DistError::Oci)?;
1113 let resolved = result
1114 .into_iter()
1115 .next()
1116 .ok_or_else(|| DistError::InvalidRef {
1117 reference: reference.to_string(),
1118 })?;
1119 let resolved_digest = resolved.resolved_digest.clone();
1120 let resolved_bytes = fs::read(&resolved.path).map_err(|source| DistError::CacheError {
1121 path: resolved.path.display().to_string(),
1122 source,
1123 })?;
1124 let resolved = ResolvedArtifact::from_path(
1125 resolved_digest.clone(),
1126 self.cache
1127 .write_component(&resolved_digest, &resolved_bytes)
1128 .map_err(|source| DistError::CacheError {
1129 path: self
1130 .cache
1131 .component_path(&resolved_digest)
1132 .display()
1133 .to_string(),
1134 source,
1135 })?,
1136 resolve_component_id_from_cache(&resolved.path, &component_id),
1137 resolve_abi_version_from_cache(&resolved.path),
1138 resolve_describe_artifact_ref_from_cache(&resolved.path),
1139 file_size_if_exists(&resolved.path),
1140 Some(normalize_content_type(
1141 Some(&resolved.media_type),
1142 WASM_CONTENT_TYPE,
1143 )),
1144 resolved.fetched_from_network,
1145 source,
1146 );
1147 self.persist_cache_entry(&resolved)?;
1148 self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
1149 resolved.validate_payload()?;
1150 Ok(resolved)
1151 }
1152
1153 fn persist_cache_entry(&self, artifact: &ResolvedArtifact) -> Result<(), DistError> {
1154 let local_path = artifact
1155 .cache_path
1156 .clone()
1157 .or_else(|| artifact.wasm_path.clone())
1158 .unwrap_or_else(|| artifact.local_path.clone());
1159 let entry = CacheEntry {
1160 format_version: CACHE_FORMAT_VERSION,
1161 cache_key: artifact.cache_key.clone(),
1162 digest: artifact.descriptor.digest.clone(),
1163 media_type: artifact.descriptor.media_type.clone(),
1164 size_bytes: artifact.descriptor.size_bytes,
1165 artifact_type: artifact.descriptor.artifact_type.clone(),
1166 source_kind: artifact.descriptor.source_kind.clone(),
1167 raw_ref: artifact.descriptor.raw_ref.clone(),
1168 canonical_ref: artifact.descriptor.canonical_ref.clone(),
1169 fetched_at: artifact.fetched_at,
1170 last_accessed_at: unix_now(),
1171 last_verified_at: None,
1172 state: cache_entry_state_from_integrity(&artifact.integrity_state),
1173 advisory_epoch: None,
1174 signature_summary: None,
1175 local_path,
1176 source_snapshot: artifact.source_snapshot.clone(),
1177 };
1178 self.cache
1179 .write_entry(&entry)
1180 .map_err(|source| DistError::CacheError {
1181 path: self.cache.entry_path(&entry.digest).display().to_string(),
1182 source,
1183 })
1184 }
1185
1186 fn persist_verification_report(
1187 &self,
1188 digest: &str,
1189 report: &VerificationReport,
1190 ) -> Result<(), DistError> {
1191 let mut entry = self.stat_cache(digest)?;
1192 entry.last_verified_at = Some(unix_now());
1193 entry.advisory_epoch = report
1194 .advisory_version
1195 .as_ref()
1196 .and_then(|raw| raw.parse::<u64>().ok());
1197 entry.signature_summary = report
1198 .checks
1199 .iter()
1200 .find(|check| check.name == "signature_verified")
1201 .map(|check| {
1202 serde_json::json!({
1203 "outcome": verification_outcome_name(&check.outcome),
1204 "detail": check.detail,
1205 })
1206 });
1207 self.cache
1208 .write_entry(&entry)
1209 .map_err(|source| DistError::CacheError {
1210 path: self.cache.entry_path(&entry.digest).display().to_string(),
1211 source,
1212 })
1213 }
1214
1215 pub async fn resolve(
1216 &self,
1217 source: ArtifactSource,
1218 _policy: ResolvePolicy,
1219 ) -> Result<ArtifactDescriptor, DistError> {
1220 let mut current = source.raw_ref.clone();
1221 for _ in 0..8 {
1222 if let Some(injected) = &self.injected
1223 && let Some(result) = injected.resolve(¤t).await?
1224 {
1225 if let InjectedResolution::Redirect(next) = result {
1226 current = next;
1227 continue;
1228 }
1229 let artifact = self.materialize_injected(result)?;
1230 return Ok(artifact.descriptor);
1231 }
1232
1233 return self.resolve_descriptor_from_reference(¤t).await;
1234 }
1235 Err(DistError::InvalidInput(
1236 "too many injected redirect hops".to_string(),
1237 ))
1238 }
1239
1240 pub fn parse_source(&self, reference: &str) -> Result<ArtifactSource, DistError> {
1241 artifact_source_from_reference(reference, &self.opts)
1242 }
1243
1244 pub fn load_advisory_set(
1245 &self,
1246 bytes: &[u8],
1247 source: impl Into<String>,
1248 ) -> Result<AdvisorySet, DistError> {
1249 let mut advisory: AdvisorySet = serde_json::from_slice(bytes)?;
1250 advisory.source = source.into();
1251 Ok(advisory)
1252 }
1253
1254 pub fn apply_policy(
1255 &self,
1256 descriptor: &ArtifactDescriptor,
1257 advisory_set: Option<&AdvisorySet>,
1258 verification_policy: &VerificationPolicy,
1259 ) -> PreliminaryDecision {
1260 let checks = vec![
1261 check_digest_allowed(&descriptor.digest, advisory_set, verification_policy),
1262 check_media_type_allowed(&descriptor.media_type, verification_policy),
1263 check_issuer_allowed(
1264 issuer_from_descriptor(descriptor),
1265 advisory_set,
1266 verification_policy,
1267 ),
1268 check_operator_version_compatible(descriptor, advisory_set, verification_policy),
1269 ];
1270
1271 preliminary_decision_from_checks(checks)
1272 }
1273
1274 pub fn verify_artifact(
1275 &self,
1276 resolved_artifact: &ResolvedArtifact,
1277 advisory_set: Option<&AdvisorySet>,
1278 verification_policy: &VerificationPolicy,
1279 ) -> Result<VerificationReport, DistError> {
1280 let mut checks = self
1281 .apply_policy(
1282 &resolved_artifact.descriptor,
1283 advisory_set,
1284 verification_policy,
1285 )
1286 .checks;
1287
1288 checks.push(check_content_digest_match(resolved_artifact)?);
1289 checks.push(check_signature_present(
1290 &resolved_artifact.descriptor,
1291 verification_policy,
1292 ));
1293 checks.push(check_signature_verified(
1294 &resolved_artifact.descriptor,
1295 verification_policy,
1296 ));
1297 checks.push(check_sbom_present(
1298 &resolved_artifact.descriptor,
1299 verification_policy,
1300 ));
1301
1302 let report = verification_report_from_checks(
1303 &resolved_artifact.descriptor,
1304 advisory_set,
1305 verification_policy,
1306 self.stat_cache(&resolved_artifact.descriptor.digest)
1307 .ok()
1308 .as_ref(),
1309 checks,
1310 );
1311
1312 self.persist_verification_report(&resolved_artifact.descriptor.digest, &report)?;
1313 Ok(report)
1314 }
1315
1316 pub async fn stage_bundle(
1317 &self,
1318 input: &StageBundleInput,
1319 advisory_set: Option<&AdvisorySet>,
1320 verification_policy: &VerificationPolicy,
1321 cache_policy: CachePolicy,
1322 ) -> Result<StageBundleResult, IntegrationError> {
1323 let source = self
1324 .parse_source(&input.bundle_ref)
1325 .map_err(IntegrationError::from_dist_error)?;
1326 let descriptor = self
1327 .resolve(source, ResolvePolicy)
1328 .await
1329 .map_err(IntegrationError::from_dist_error)?;
1330 let resolved_artifact = self
1331 .fetch(&descriptor, cache_policy)
1332 .await
1333 .map_err(IntegrationError::from_dist_error)?;
1334 let verification_report = self
1335 .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1336 .map_err(IntegrationError::from_dist_error)?;
1337 if !verification_report.passed {
1338 return Err(IntegrationError::from_verification_report(
1339 verification_report,
1340 ));
1341 }
1342 let cache_entry = self
1343 .stat_cache(&descriptor.digest)
1344 .map_err(IntegrationError::from_dist_error)?;
1345 let bundle_id = bundle_id_for_digest(&descriptor.digest);
1346 self.persist_bundle_record(&BundleRecord {
1347 bundle_id: bundle_id.clone(),
1348 cache_key: cache_entry.cache_key.clone(),
1349 canonical_ref: descriptor.canonical_ref.clone(),
1350 source_kind: descriptor.source_kind.clone(),
1351 fetched_at: cache_entry.fetched_at,
1352 lifecycle_state: BundleLifecycleState::Staged,
1353 })
1354 .map_err(IntegrationError::from_dist_error)?;
1355
1356 Ok(StageBundleResult {
1357 bundle_id,
1358 canonical_ref: descriptor.canonical_ref.clone(),
1359 descriptor,
1360 resolved_artifact,
1361 verification_report,
1362 cache_entry,
1363 stage_audit_fields: StageAuditFields {
1364 staged_at: unix_now(),
1365 requested_access_mode: input.requested_access_mode.clone(),
1366 verification_policy_ref: input.verification_policy_ref.clone(),
1367 cache_policy_ref: input.cache_policy_ref.clone(),
1368 tenant: input.tenant.clone(),
1369 team: input.team.clone(),
1370 },
1371 })
1372 }
1373
1374 pub fn warm_bundle(
1375 &self,
1376 input: &WarmBundleInput,
1377 advisory_set: Option<&AdvisorySet>,
1378 verification_policy: &VerificationPolicy,
1379 ) -> Result<WarmBundleResult, IntegrationError> {
1380 let expected_bundle_id = bundle_id_for_digest(&input.cache_key);
1381 if input.bundle_id != expected_bundle_id {
1382 return Err(IntegrationError {
1383 code: IntegrationErrorCode::InvalidReference,
1384 summary: format!(
1385 "bundle id {} does not match cache key {}",
1386 input.bundle_id, input.cache_key
1387 ),
1388 retryable: false,
1389 details: Some(serde_json::json!({
1390 "bundle_id": input.bundle_id,
1391 "expected_bundle_id": expected_bundle_id,
1392 "cache_key": input.cache_key,
1393 })),
1394 });
1395 }
1396
1397 let mut resolved_artifact = self
1398 .open_cached(&input.cache_key)
1399 .map_err(IntegrationError::from_dist_error)?;
1400 if let Some(expected_operator_version) = &input.expected_operator_version {
1401 resolved_artifact.descriptor.annotations.insert(
1402 "operator_version".to_string(),
1403 serde_json::Value::String(expected_operator_version.clone()),
1404 );
1405 }
1406 let verification_report = self
1407 .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1408 .map_err(IntegrationError::from_dist_error)?;
1409 if !verification_report.passed {
1410 return Err(IntegrationError::from_verification_report(
1411 verification_report,
1412 ));
1413 }
1414 let opened = self.artifact_opener.open(
1415 &resolved_artifact,
1416 &ArtifactOpenRequest {
1417 bundle_id: input.bundle_id.clone(),
1418 dry_run: input.dry_run,
1419 smoke_test: input.smoke_test,
1420 },
1421 )?;
1422
1423 Ok(WarmBundleResult {
1424 bundle_id: input.bundle_id.clone(),
1425 bundle_manifest_summary: opened.bundle_manifest_summary,
1426 bundle_open_mode: opened.bundle_open_mode,
1427 warnings: verification_report
1428 .warnings
1429 .iter()
1430 .cloned()
1431 .chain(opened.warnings)
1432 .collect(),
1433 errors: verification_report.errors.clone(),
1434 verification_report,
1435 warm_audit_fields: WarmAuditFields {
1436 warmed_at: unix_now(),
1437 smoke_test: input.smoke_test,
1438 dry_run: input.dry_run,
1439 reopened_from_cache: true,
1440 },
1441 })
1442 }
1443
1444 pub fn rollback_bundle(
1445 &self,
1446 input: &RollbackBundleInput,
1447 advisory_set: Option<&AdvisorySet>,
1448 verification_policy: &VerificationPolicy,
1449 ) -> Result<RollbackBundleResult, IntegrationError> {
1450 let bundle_record = self.stat_bundle(&input.target_bundle_id).ok();
1451 let digest = if let Some(record) = &bundle_record {
1452 normalize_digest(&record.cache_key)
1453 } else {
1454 digest_from_bundle_id(&input.target_bundle_id).ok_or_else(|| IntegrationError {
1455 code: IntegrationErrorCode::InvalidReference,
1456 summary: format!("invalid bundle id {}", input.target_bundle_id),
1457 retryable: false,
1458 details: Some(serde_json::json!({
1459 "bundle_id": input.target_bundle_id,
1460 })),
1461 })?
1462 };
1463
1464 if let Some(expected_cache_key) = &input.expected_cache_key
1465 && expected_cache_key != &digest
1466 {
1467 return Err(IntegrationError {
1468 code: IntegrationErrorCode::InvalidReference,
1469 summary: format!(
1470 "expected cache key {} does not match rollback bundle digest {}",
1471 expected_cache_key, digest
1472 ),
1473 retryable: false,
1474 details: Some(serde_json::json!({
1475 "bundle_id": input.target_bundle_id,
1476 "expected_cache_key": expected_cache_key,
1477 "actual_cache_key": digest,
1478 })),
1479 });
1480 }
1481
1482 let resolved_artifact = self
1483 .open_cached(&digest)
1484 .map_err(IntegrationError::from_dist_error)?;
1485 let verification_report = self
1486 .verify_artifact(&resolved_artifact, advisory_set, verification_policy)
1487 .map_err(IntegrationError::from_dist_error)?;
1488 if !verification_report.passed {
1489 return Err(IntegrationError::from_verification_report(
1490 verification_report,
1491 ));
1492 }
1493 let cache_entry = self
1494 .stat_cache(&digest)
1495 .map_err(IntegrationError::from_dist_error)?;
1496
1497 Ok(RollbackBundleResult {
1498 bundle_id: input.target_bundle_id.clone(),
1499 reopened_from_cache: true,
1500 cache_entry,
1501 verification_report,
1502 rollback_audit_fields: RollbackAuditFields {
1503 rolled_back_at: unix_now(),
1504 reopened_from_cache: true,
1505 expected_cache_key: input.expected_cache_key.clone(),
1506 },
1507 })
1508 }
1509
1510 pub fn stat_bundle(&self, bundle_id: &str) -> Result<BundleRecord, DistError> {
1511 self.cache.read_bundle_record(bundle_id).map_err(|source| {
1512 if source.kind() == std::io::ErrorKind::NotFound {
1513 DistError::NotFound {
1514 reference: bundle_id.to_string(),
1515 }
1516 } else {
1517 DistError::CacheError {
1518 path: self
1519 .cache
1520 .bundle_record_path(bundle_id)
1521 .display()
1522 .to_string(),
1523 source,
1524 }
1525 }
1526 })
1527 }
1528
1529 pub fn list_bundles(&self) -> Result<Vec<BundleRecord>, DistError> {
1530 self.cache
1531 .list_bundle_records()
1532 .map_err(|source| DistError::CacheError {
1533 path: self.cache.bundle_records_root().display().to_string(),
1534 source,
1535 })
1536 }
1537
1538 pub fn set_bundle_state(
1539 &self,
1540 bundle_id: &str,
1541 lifecycle_state: BundleLifecycleState,
1542 ) -> Result<BundleRecord, DistError> {
1543 let mut record = self.stat_bundle(bundle_id)?;
1544 record.lifecycle_state = lifecycle_state;
1545 self.persist_bundle_record(&record)?;
1546 Ok(record)
1547 }
1548
1549 pub async fn fetch(
1550 &self,
1551 descriptor: &ArtifactDescriptor,
1552 _cache_policy: CachePolicy,
1553 ) -> Result<ResolvedArtifact, DistError> {
1554 if let Ok(existing) = self.open_cached(&descriptor.cache_key()) {
1555 return Ok(existing);
1556 }
1557
1558 let raw = descriptor.raw_ref.as_str();
1559 let artifact = match descriptor.source_kind {
1560 ArtifactSourceKind::CacheDigest => self.open_cached(&descriptor.digest)?,
1561 ArtifactSourceKind::Https => self.fetch_http(raw).await?,
1562 ArtifactSourceKind::File => self.ingest_file(Path::new(raw)).await?,
1563 ArtifactSourceKind::Oci => {
1564 let reference = descriptor
1565 .canonical_ref
1566 .trim_start_matches("oci://")
1567 .to_string();
1568 self.pull_oci(&reference).await?
1569 }
1570 ArtifactSourceKind::Repo => {
1571 self.resolve_repo_ref(raw.trim_start_matches("repo://"))
1572 .await?
1573 }
1574 ArtifactSourceKind::Store => {
1575 self.resolve_store_ref(raw.trim_start_matches("store://"))
1576 .await?
1577 }
1578 #[cfg(feature = "fixture-resolver")]
1579 ArtifactSourceKind::Fixture => {
1580 self.resolve_fixture_ref(raw.trim_start_matches("fixture://"))
1581 .await?
1582 }
1583 #[cfg(not(feature = "fixture-resolver"))]
1584 ArtifactSourceKind::Fixture => {
1585 return Err(DistError::InvalidInput(
1586 "fixture resolver feature is disabled".to_string(),
1587 ));
1588 }
1589 };
1590
1591 if !descriptor.digest.is_empty() && artifact.descriptor.digest != descriptor.digest {
1592 return Err(DistError::CorruptArtifact {
1593 reference: descriptor.digest.clone(),
1594 reason: format!(
1595 "fetched digest {} did not match resolved descriptor {}",
1596 artifact.descriptor.digest, descriptor.digest
1597 ),
1598 });
1599 }
1600 Ok(artifact)
1601 }
1602
1603 pub fn open_cached(&self, digest_or_cache_key: &str) -> Result<ResolvedArtifact, DistError> {
1604 let digest = normalize_digest(digest_or_cache_key);
1605 let entry = self.stat_cache(&digest)?;
1606 let path = self
1607 .cache
1608 .existing_component(&entry.digest)
1609 .ok_or_else(|| DistError::CorruptArtifact {
1610 reference: digest_or_cache_key.to_string(),
1611 reason: "cache metadata exists but cached blob is missing".to_string(),
1612 })?;
1613 let mut artifact = ResolvedArtifact::from_path(
1614 entry.digest.clone(),
1615 path.clone(),
1616 component_id_from_descriptor(&entry),
1617 None,
1618 None,
1619 Some(entry.size_bytes),
1620 Some(entry.media_type.clone()),
1621 false,
1622 legacy_source_from_entry(&entry),
1623 );
1624 artifact.descriptor = descriptor_from_entry(&entry);
1625 artifact.cache_key = entry.cache_key.clone();
1626 artifact.local_path = path.clone();
1627 artifact.fetched_at = entry.fetched_at;
1628 artifact.integrity_state = integrity_state_from_entry(&entry.state);
1629 artifact.source_snapshot = entry.source_snapshot.clone();
1630 artifact.cache_path = Some(path.clone());
1631 artifact.wasm_path = Some(path);
1632 Ok(artifact)
1633 }
1634
1635 pub fn stat_cache(&self, digest_or_cache_key: &str) -> Result<CacheEntry, DistError> {
1636 let digest = normalize_digest(digest_or_cache_key);
1637 let entry = self.cache.read_entry(&digest).map_err(|source| {
1638 if source.kind() == std::io::ErrorKind::NotFound {
1639 DistError::NotFound {
1640 reference: digest_or_cache_key.to_string(),
1641 }
1642 } else {
1643 DistError::CacheError {
1644 path: self.cache.entry_path(&digest).display().to_string(),
1645 source,
1646 }
1647 }
1648 })?;
1649 let _ = self.cache.touch_last_used(&entry.digest);
1650 Ok(entry)
1651 }
1652
1653 #[deprecated(note = "use evaluate_retention/apply_retention for explicit retention decisions")]
1654 pub fn evict_cache(&self, digests: &[String]) -> Result<RetentionReport, DistError> {
1655 let mut report = RetentionReport {
1656 scanned_entries: digests.len(),
1657 ..RetentionReport::default()
1658 };
1659 let entries = digests
1660 .iter()
1661 .filter_map(|digest| self.stat_cache(digest).ok())
1662 .collect::<Vec<_>>();
1663 let input = RetentionInput {
1664 entries,
1665 active_bundle_ids: Vec::new(),
1666 staged_bundle_ids: Vec::new(),
1667 warming_bundle_ids: Vec::new(),
1668 ready_bundle_ids: Vec::new(),
1669 draining_bundle_ids: Vec::new(),
1670 session_referenced_bundle_ids: Vec::new(),
1671 max_cache_bytes: 0,
1672 max_entry_age: Some(0),
1673 minimum_rollback_depth: 0,
1674 environment: RetentionEnvironment::Dev,
1675 };
1676 let outcome = self.apply_retention(&input)?;
1677 report.evicted = outcome.report.evicted;
1678 report.bytes_reclaimed = outcome.report.bytes_reclaimed;
1679 report.refusals = digests
1680 .iter()
1681 .filter(|digest| self.stat_cache(digest).is_err())
1682 .cloned()
1683 .chain(outcome.report.refusals)
1684 .collect();
1685 report.kept = report
1686 .scanned_entries
1687 .saturating_sub(report.evicted + report.refusals.len());
1688 Ok(report)
1689 }
1690
1691 #[deprecated(note = "use parse_source + resolve + fetch instead")]
1692 pub async fn resolve_ref(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
1693 let source = self.parse_source(reference)?;
1694 let descriptor = self.resolve(source, ResolvePolicy).await?;
1695 self.fetch(&descriptor, CachePolicy).await
1696 }
1697
1698 #[allow(deprecated)]
1699 #[deprecated(note = "use parse_source + resolve + fetch instead")]
1700 pub async fn resolve_ref_request(
1701 &self,
1702 req: ResolveRefRequest,
1703 ) -> Result<ResolvedArtifact, DistError> {
1704 self.resolve_ref(&req.reference).await
1705 }
1706
1707 #[allow(deprecated)]
1708 #[deprecated(note = "use parse_source + resolve + fetch instead")]
1709 pub async fn resolve_component(
1710 &self,
1711 req: ResolveComponentRequest,
1712 ) -> Result<ResolvedArtifact, DistError> {
1713 self.resolve_ref(&req.reference).await
1714 }
1715
1716 #[allow(deprecated)]
1717 #[deprecated(note = "use parse_source + resolve + fetch or open_cached instead")]
1718 pub async fn ensure_cached(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
1719 let resolved = self.resolve_ref(reference).await?;
1720 if resolved.wasm_bytes.is_some() {
1721 return Ok(resolved);
1722 }
1723 if let Some(path) = &resolved.wasm_path
1724 && path.exists()
1725 {
1726 return Ok(resolved);
1727 }
1728 Err(DistError::NotFound {
1729 reference: reference.to_string(),
1730 })
1731 }
1732
1733 pub async fn fetch_digest(&self, digest: &str) -> Result<PathBuf, DistError> {
1734 let normalized = normalize_digest(digest);
1735 self.cache
1736 .existing_component(&normalized)
1737 .ok_or(DistError::NotFound {
1738 reference: normalized,
1739 })
1740 }
1741
1742 pub async fn pull_lock(&self, lock_path: &Path) -> Result<Vec<ResolvedArtifact>, DistError> {
1743 let contents = fs::read_to_string(lock_path).map_err(|source| DistError::CacheError {
1744 path: lock_path.display().to_string(),
1745 source,
1746 })?;
1747 let entries = parse_lockfile(&contents)?;
1748 let mut resolved = Vec::with_capacity(entries.len());
1749 for entry in entries {
1750 let resolved_item = if let Some(digest) = entry.digest.as_ref() {
1751 if let Ok(item) = self.open_cached(digest) {
1752 item
1753 } else {
1754 let reference = entry
1755 .reference
1756 .clone()
1757 .ok_or_else(|| DistError::InvalidInput("lock entry missing ref".into()))?;
1758 let source = self.parse_source(&reference)?;
1759 let mut descriptor = self.resolve(source, ResolvePolicy).await?;
1760 if !descriptor.digest.is_empty() && descriptor.digest != *digest {
1761 return Err(DistError::CorruptArtifact {
1762 reference: reference.clone(),
1763 reason: format!(
1764 "lock digest {} did not match resolved descriptor {}",
1765 digest, descriptor.digest
1766 ),
1767 });
1768 }
1769 descriptor.digest = digest.clone();
1770 self.fetch(&descriptor, CachePolicy).await?
1771 }
1772 } else {
1773 let reference = entry
1774 .reference
1775 .clone()
1776 .ok_or_else(|| DistError::InvalidInput("lock entry missing ref".into()))?;
1777 let source = self.parse_source(&reference)?;
1778 let descriptor = self.resolve(source, ResolvePolicy).await?;
1779 self.fetch(&descriptor, CachePolicy).await?
1780 };
1781 resolved.push(resolved_item);
1782 }
1783 Ok(resolved)
1784 }
1785
1786 pub fn list_cache(&self) -> Vec<String> {
1787 self.cache.list_digests()
1788 }
1789
1790 pub fn list_cache_entries(&self) -> Vec<CacheEntry> {
1791 self.cache
1792 .list_digests()
1793 .into_iter()
1794 .filter_map(|digest| self.stat_cache(&digest).ok())
1795 .collect()
1796 }
1797
1798 pub fn evaluate_retention(
1799 &self,
1800 input: &RetentionInput,
1801 ) -> Result<RetentionOutcome, DistError> {
1802 let decisions = retention_decisions(input);
1803 let mut report = RetentionReport {
1804 scanned_entries: decisions.len(),
1805 ..RetentionReport::default()
1806 };
1807 for decision in &decisions {
1808 match decision.decision {
1809 RetentionDisposition::Keep => report.kept += 1,
1810 RetentionDisposition::Evict => report.evicted += 1,
1811 RetentionDisposition::Protect => report.protected += 1,
1812 }
1813 }
1814 Ok(RetentionOutcome { decisions, report })
1815 }
1816
1817 pub fn apply_retention(&self, input: &RetentionInput) -> Result<RetentionOutcome, DistError> {
1818 let mut outcome = self.evaluate_retention(input)?;
1819 let decision_map = input
1820 .entries
1821 .iter()
1822 .map(|entry| (entry.cache_key.clone(), entry.size_bytes))
1823 .collect::<std::collections::BTreeMap<_, _>>();
1824 let evicted = outcome
1825 .decisions
1826 .iter()
1827 .filter(|decision| matches!(decision.decision, RetentionDisposition::Evict))
1828 .map(|decision| decision.cache_key.clone())
1829 .collect::<Vec<_>>();
1830 for cache_key in evicted {
1831 let digest = normalize_digest(&cache_key);
1832 let dir = self.cache.component_dir(&digest);
1833 if dir.exists() {
1834 fs::remove_dir_all(&dir).map_err(|source| DistError::CacheError {
1835 path: dir.display().to_string(),
1836 source,
1837 })?;
1838 self.cache
1839 .remove_bundle_record(&bundle_id_for_digest(&digest))
1840 .ok();
1841 outcome.report.bytes_reclaimed +=
1842 decision_map.get(&cache_key).copied().unwrap_or(0);
1843 } else {
1844 outcome.report.refusals.push(cache_key);
1845 }
1846 }
1847 outcome.report.kept = outcome
1848 .decisions
1849 .iter()
1850 .filter(|decision| matches!(decision.decision, RetentionDisposition::Keep))
1851 .count();
1852 outcome.report.protected = outcome
1853 .decisions
1854 .iter()
1855 .filter(|decision| matches!(decision.decision, RetentionDisposition::Protect))
1856 .count();
1857 outcome.report.evicted = outcome
1858 .decisions
1859 .iter()
1860 .filter(|decision| matches!(decision.decision, RetentionDisposition::Evict))
1861 .count()
1862 .saturating_sub(outcome.report.refusals.len());
1863 Ok(outcome)
1864 }
1865
1866 fn persist_bundle_record(&self, record: &BundleRecord) -> Result<(), DistError> {
1867 self.cache
1868 .write_bundle_record(record)
1869 .map_err(|source| DistError::CacheError {
1870 path: self
1871 .cache
1872 .bundle_record_path(&record.bundle_id)
1873 .display()
1874 .to_string(),
1875 source,
1876 })
1877 }
1878
1879 #[deprecated(note = "use evict_cache or apply_retention instead")]
1880 pub fn remove_cached(&self, digests: &[String]) -> Result<(), DistError> {
1881 for digest in digests {
1882 for dir in [
1883 self.cache.component_dir(digest),
1884 self.cache.legacy_component_dir(digest),
1885 ] {
1886 if dir.exists() {
1887 fs::remove_dir_all(&dir).map_err(|source| DistError::CacheError {
1888 path: dir.display().to_string(),
1889 source,
1890 })?;
1891 }
1892 }
1893 self.cache
1894 .remove_bundle_record(&bundle_id_for_digest(digest))
1895 .ok();
1896 }
1897 Ok(())
1898 }
1899
1900 #[deprecated(note = "use evaluate_retention/apply_retention for cache lifecycle management")]
1901 pub fn gc(&self) -> Result<Vec<String>, DistError> {
1902 let mut removed = Vec::new();
1903 for digest in self.cache.list_digests() {
1904 let primary = self.cache.component_path(&digest);
1905 let legacy = self.cache.legacy_component_path(&digest);
1906 if !primary.exists() && !legacy.exists() {
1907 let dir = self.cache.component_dir(&digest);
1908 let legacy_dir = self.cache.legacy_component_dir(&digest);
1909 fs::remove_dir_all(&dir).ok();
1910 fs::remove_dir_all(&legacy_dir).ok();
1911 self.cache
1912 .remove_bundle_record(&bundle_id_for_digest(&digest))
1913 .ok();
1914 removed.push(digest);
1915 }
1916 }
1917 for record in self.list_bundles()? {
1918 if self.stat_cache(&record.cache_key).is_err() {
1919 self.cache.remove_bundle_record(&record.bundle_id).ok();
1920 removed.push(record.cache_key);
1921 }
1922 }
1923 removed.sort();
1924 removed.dedup();
1925 Ok(removed)
1926 }
1927
1928 async fn fetch_http(&self, url: &str) -> Result<ResolvedArtifact, DistError> {
1929 if self.opts.offline {
1930 return Err(DistError::Offline {
1931 reference: url.to_string(),
1932 });
1933 }
1934 let request_url = ensure_secure_http_url(url, self.opts.allow_insecure_local_http)?;
1935 let bytes = self
1936 .http
1937 .get(request_url.clone())
1938 .send()
1939 .await
1940 .map_err(|err| DistError::Network(err.to_string()))?;
1941 let response = bytes
1942 .error_for_status()
1943 .map_err(|err| DistError::Network(err.to_string()))?;
1944 let content_type = response
1945 .headers()
1946 .get(reqwest::header::CONTENT_TYPE)
1947 .and_then(|v| v.to_str().ok())
1948 .map(str::to_string)
1949 .or_else(|| Some(WASM_CONTENT_TYPE.to_string()));
1950 let bytes = response
1951 .bytes()
1952 .await
1953 .map_err(|err| DistError::Network(err.to_string()))?;
1954 let digest = digest_for_bytes(&bytes);
1955 let path = self
1956 .cache
1957 .write_component(&digest, &bytes)
1958 .map_err(|source| DistError::CacheError {
1959 path: self.cache.component_path(&digest).display().to_string(),
1960 source,
1961 })?;
1962 let resolved = ResolvedArtifact::from_path(
1963 digest,
1964 path,
1965 component_id_from_ref(&RefKind::Http(request_url.to_string())),
1966 None,
1967 None,
1968 Some(bytes.len() as u64),
1969 content_type,
1970 true,
1971 LegacyArtifactSource::Http(request_url.to_string()),
1972 );
1973 self.persist_cache_entry(&resolved)?;
1974 self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
1975 resolved.validate_payload()?;
1976 Ok(resolved)
1977 }
1978
1979 async fn ingest_file(&self, path: &Path) -> Result<ResolvedArtifact, DistError> {
1980 let bytes = fs::read(path).map_err(|source| DistError::CacheError {
1981 path: path.display().to_string(),
1982 source,
1983 })?;
1984 let digest = digest_for_bytes(&bytes);
1985 let cached = self
1986 .cache
1987 .write_component(&digest, &bytes)
1988 .map_err(|source| DistError::CacheError {
1989 path: self.cache.component_path(&digest).display().to_string(),
1990 source,
1991 })?;
1992 let resolved = ResolvedArtifact::from_path(
1993 digest,
1994 cached,
1995 component_id_from_ref(&RefKind::File(path.to_path_buf())),
1996 None,
1997 source_sidecar_describe_ref(path),
1998 Some(bytes.len() as u64),
1999 Some(WASM_CONTENT_TYPE.to_string()),
2000 true,
2001 LegacyArtifactSource::File(path.to_path_buf()),
2002 );
2003 self.persist_cache_entry(&resolved)?;
2004 self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
2005 resolved.validate_payload()?;
2006 Ok(resolved)
2007 }
2008
2009 async fn pull_oci(&self, reference: &str) -> Result<ResolvedArtifact, DistError> {
2010 let component_id = component_id_from_ref(&RefKind::Oci(reference.to_string()));
2011 self.pull_oci_with_source(
2012 reference,
2013 LegacyArtifactSource::Oci(reference.to_string()),
2014 component_id,
2015 )
2016 .await
2017 }
2018
2019 async fn pull_oci_with_source(
2020 &self,
2021 reference: &str,
2022 source: LegacyArtifactSource,
2023 component_id: String,
2024 ) -> Result<ResolvedArtifact, DistError> {
2025 if self.opts.offline {
2026 return Err(DistError::Offline {
2027 reference: reference.to_string(),
2028 });
2029 }
2030 let result = self
2031 .oci
2032 .resolve_refs(&crate::oci_components::ComponentsExtension {
2033 refs: vec![reference.to_string()],
2034 mode: crate::oci_components::ComponentsMode::Eager,
2035 })
2036 .await
2037 .map_err(DistError::Oci)?;
2038 let resolved = result
2039 .into_iter()
2040 .next()
2041 .ok_or_else(|| DistError::InvalidRef {
2042 reference: reference.to_string(),
2043 })?;
2044 let resolved_digest = resolved.resolved_digest.clone();
2045 let resolved_bytes = fs::read(&resolved.path).map_err(|source| DistError::CacheError {
2046 path: resolved.path.display().to_string(),
2047 source,
2048 })?;
2049 let resolved = ResolvedArtifact::from_path(
2050 resolved_digest.clone(),
2051 self.cache
2052 .write_component(&resolved_digest, &resolved_bytes)
2053 .map_err(|source| DistError::CacheError {
2054 path: self
2055 .cache
2056 .component_path(&resolved_digest)
2057 .display()
2058 .to_string(),
2059 source,
2060 })?,
2061 resolve_component_id_from_cache(&resolved.path, &component_id),
2062 resolve_abi_version_from_cache(&resolved.path),
2063 resolve_describe_artifact_ref_from_cache(&resolved.path),
2064 file_size_if_exists(&resolved.path),
2065 Some(normalize_content_type(
2066 Some(&resolved.media_type),
2067 WASM_CONTENT_TYPE,
2068 )),
2069 resolved.fetched_from_network,
2070 source,
2071 );
2072 self.persist_cache_entry(&resolved)?;
2073 self.enforce_cache_cap(Some(&resolved.descriptor.digest))?;
2074 resolved.validate_payload()?;
2075 Ok(resolved)
2076 }
2077
2078 async fn resolve_repo_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2079 if self.opts.repo_registry_base.is_none() {
2080 return Err(DistError::ResolutionUnavailable {
2081 reference: format!("repo://{target}"),
2082 });
2083 }
2084 let mapped = map_registry_target(target, self.opts.repo_registry_base.as_deref())
2085 .ok_or_else(|| DistError::Unauthorized {
2086 target: format!("repo://{target}"),
2087 })?;
2088 self.pull_oci_with_source(
2089 &mapped,
2090 LegacyArtifactSource::Repo(format!("repo://{target}")),
2091 target.to_string(),
2092 )
2093 .await
2094 }
2095
2096 async fn resolve_store_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2097 if is_greentic_biz_store_target(target) {
2098 let parsed = parse_greentic_biz_store_target(target)?;
2099 let client = self.greentic_biz_store_client(&parsed.tenant).await?;
2100 return self
2101 .pull_oci_with_source_and_client(
2102 &parsed.mapped_reference,
2103 LegacyArtifactSource::Store(format!("store://{target}")),
2104 target.to_string(),
2105 client,
2106 )
2107 .await;
2108 }
2109 if self.opts.store_registry_base.is_none() {
2110 return Err(DistError::ResolutionUnavailable {
2111 reference: format!("store://{target}"),
2112 });
2113 }
2114 let mapped = map_registry_target(target, self.opts.store_registry_base.as_deref())
2115 .ok_or_else(|| DistError::Unauthorized {
2116 target: format!("store://{target}"),
2117 })?;
2118 self.pull_oci_with_source(
2119 &mapped,
2120 LegacyArtifactSource::Store(format!("store://{target}")),
2121 target.to_string(),
2122 )
2123 .await
2124 }
2125
2126 #[cfg(feature = "fixture-resolver")]
2127 async fn resolve_fixture_ref(&self, target: &str) -> Result<ResolvedArtifact, DistError> {
2128 let fixture_dir = self
2129 .opts
2130 .fixture_dir
2131 .as_ref()
2132 .ok_or_else(|| DistError::InvalidInput("fixture:// requires fixture_dir".into()))?;
2133 let raw = target.trim_start_matches('/');
2134 let candidate = if raw.ends_with(".wasm") {
2135 fixture_dir.join(raw)
2136 } else {
2137 fixture_dir.join(format!("{raw}.wasm"))
2138 };
2139 if !candidate.exists() {
2140 return Err(DistError::NotFound {
2141 reference: format!("fixture://{target}"),
2142 });
2143 }
2144 self.ingest_file(&candidate).await
2145 }
2146
2147 fn materialize_injected(
2148 &self,
2149 resolved: InjectedResolution,
2150 ) -> Result<ResolvedArtifact, DistError> {
2151 let artifact = match resolved {
2152 InjectedResolution::Redirect(_) => {
2153 return Err(DistError::InvalidInput(
2154 "unexpected redirect during materialization".into(),
2155 ));
2156 }
2157 InjectedResolution::WasmBytes {
2158 resolved_digest,
2159 wasm_bytes,
2160 component_id,
2161 abi_version,
2162 source,
2163 } => {
2164 let legacy_source = legacy_source_from_public(&source);
2165 let path = self
2166 .cache
2167 .write_component(&resolved_digest, &wasm_bytes)
2168 .map_err(|source| DistError::CacheError {
2169 path: self
2170 .cache
2171 .component_path(&resolved_digest)
2172 .display()
2173 .to_string(),
2174 source,
2175 })?;
2176 ResolvedArtifact::from_path(
2177 resolved_digest,
2178 path,
2179 component_id,
2180 abi_version,
2181 None,
2182 Some(wasm_bytes.len() as u64),
2183 Some(WASM_CONTENT_TYPE.to_string()),
2184 true,
2185 legacy_source,
2186 )
2187 }
2188 InjectedResolution::WasmPath {
2189 resolved_digest,
2190 wasm_path,
2191 component_id,
2192 abi_version,
2193 source,
2194 } => {
2195 let legacy_source = legacy_source_from_public(&source);
2196 ResolvedArtifact::from_path(
2197 resolved_digest,
2198 wasm_path.clone(),
2199 component_id,
2200 abi_version,
2201 resolve_describe_artifact_ref_from_cache(&wasm_path),
2202 file_size_if_exists(&wasm_path),
2203 Some(WASM_CONTENT_TYPE.to_string()),
2204 false,
2205 legacy_source,
2206 )
2207 }
2208 };
2209 self.persist_cache_entry(&artifact)?;
2210 self.enforce_cache_cap(Some(&artifact.descriptor.digest))?;
2211 artifact.validate_payload()?;
2212 Ok(artifact)
2213 }
2214
2215 fn enforce_cache_cap(&self, current_digest: Option<&str>) -> Result<(), DistError> {
2216 let bundle_records = self.list_bundles()?;
2217 let mut staged_bundle_ids = bundle_records
2218 .iter()
2219 .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Staged))
2220 .map(|record| record.bundle_id.clone())
2221 .collect::<Vec<_>>();
2222 let warming_bundle_ids = bundle_records
2223 .iter()
2224 .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Warming))
2225 .map(|record| record.bundle_id.clone())
2226 .collect::<Vec<_>>();
2227 let ready_bundle_ids = bundle_records
2228 .iter()
2229 .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Ready))
2230 .map(|record| record.bundle_id.clone())
2231 .collect::<Vec<_>>();
2232 let draining_bundle_ids = bundle_records
2233 .iter()
2234 .filter(|record| matches!(record.lifecycle_state, BundleLifecycleState::Draining))
2235 .map(|record| record.bundle_id.clone())
2236 .collect::<Vec<_>>();
2237 if let Some(current_digest) = current_digest {
2238 staged_bundle_ids.push(bundle_id_for_digest(current_digest));
2239 }
2240 self.apply_retention(&RetentionInput {
2241 entries: self.list_cache_entries(),
2242 active_bundle_ids: Vec::new(),
2243 staged_bundle_ids,
2244 warming_bundle_ids,
2245 ready_bundle_ids,
2246 draining_bundle_ids,
2247 session_referenced_bundle_ids: Vec::new(),
2248 max_cache_bytes: self.opts.cache_max_bytes,
2249 max_entry_age: None,
2250 minimum_rollback_depth: 0,
2251 environment: RetentionEnvironment::Dev,
2252 })
2253 .map(|_| ())
2254 }
2255
2256 pub async fn pull_oci_with_details(
2257 &self,
2258 reference: &str,
2259 ) -> Result<OciCacheInspection, DistError> {
2260 if self.opts.offline {
2261 return Err(DistError::Offline {
2262 reference: reference.to_string(),
2263 });
2264 }
2265 let result = self
2266 .oci
2267 .resolve_refs(&crate::oci_components::ComponentsExtension {
2268 refs: vec![reference.to_string()],
2269 mode: crate::oci_components::ComponentsMode::Eager,
2270 })
2271 .await
2272 .map_err(DistError::Oci)?;
2273 let resolved = result
2274 .into_iter()
2275 .next()
2276 .ok_or_else(|| DistError::InvalidRef {
2277 reference: reference.to_string(),
2278 })?;
2279 let cache_dir = resolved
2280 .path
2281 .parent()
2282 .map(|p| p.to_path_buf())
2283 .ok_or_else(|| DistError::InvalidInput("cache path missing parent".into()))?;
2284 Ok(OciCacheInspection {
2285 digest: resolved.resolved_digest,
2286 cache_dir,
2287 selected_media_type: resolved.media_type,
2288 fetched: resolved.fetched_from_network,
2289 })
2290 }
2291}
2292
2293fn default_distribution_cache_root() -> PathBuf {
2294 if let Ok(root) = std::env::var("GREENTIC_HOME") {
2295 return PathBuf::from(root).join("cache").join("distribution");
2296 }
2297 if let Ok(home) = std::env::var("HOME") {
2298 return PathBuf::from(home)
2299 .join(".greentic")
2300 .join("cache")
2301 .join("distribution");
2302 }
2303 PathBuf::from(".greentic")
2304 .join("cache")
2305 .join("distribution")
2306}
2307
2308#[derive(Debug, Error)]
2309pub enum DistError {
2310 #[error("invalid reference `{reference}`")]
2311 InvalidRef { reference: String },
2312 #[error("reference `{reference}` not found")]
2313 NotFound { reference: String },
2314 #[error("unauthorized for `{target}`")]
2315 Unauthorized { target: String },
2316 #[error("resolution unavailable for `{reference}`")]
2317 ResolutionUnavailable { reference: String },
2318 #[error("network error: {0}")]
2319 Network(String),
2320 #[error("corrupt artifact `{reference}`: {reason}")]
2321 CorruptArtifact { reference: String, reason: String },
2322 #[error("unsupported abi `{abi}`")]
2323 UnsupportedAbi { abi: String },
2324 #[error("cache error at `{path}`: {source}")]
2325 CacheError {
2326 path: String,
2327 #[source]
2328 source: std::io::Error,
2329 },
2330 #[error("invalid input: {0}")]
2331 InvalidInput(String),
2332 #[error("insecure url `{url}`: only https is allowed")]
2333 InsecureUrl { url: String },
2334 #[error("offline mode forbids fetching `{reference}`")]
2335 Offline { reference: String },
2336 #[error("store auth error: {0}")]
2337 StoreAuth(String),
2338 #[error("oci error: {0}")]
2339 Oci(#[from] crate::oci_components::OciComponentError),
2340 #[error("invalid lockfile: {0}")]
2341 Serde(#[from] serde_json::Error),
2342}
2343
2344impl DistError {
2345 pub fn exit_code(&self) -> i32 {
2346 match self {
2347 DistError::InvalidRef { .. }
2348 | DistError::InvalidInput(_)
2349 | DistError::InsecureUrl { .. }
2350 | DistError::Serde(_) => 2,
2351 DistError::NotFound { .. } => 3,
2352 DistError::Offline { .. } => 4,
2353 DistError::Unauthorized { .. }
2354 | DistError::ResolutionUnavailable { .. }
2355 | DistError::StoreAuth(_) => 5,
2356 _ => 10,
2357 }
2358 }
2359}
2360
2361impl IntegrationError {
2362 fn from_dist_error(error: DistError) -> Self {
2363 match error {
2364 DistError::InvalidRef { reference } => Self {
2365 code: IntegrationErrorCode::InvalidReference,
2366 summary: format!("invalid reference `{reference}`"),
2367 retryable: false,
2368 details: Some(serde_json::json!({ "reference": reference })),
2369 },
2370 DistError::NotFound { reference } => Self {
2371 code: IntegrationErrorCode::CacheMiss,
2372 summary: format!("artifact `{reference}` was not found"),
2373 retryable: true,
2374 details: Some(serde_json::json!({ "reference": reference })),
2375 },
2376 DistError::Unauthorized { target } => Self {
2377 code: IntegrationErrorCode::ResolutionFailed,
2378 summary: format!("unauthorized for `{target}`"),
2379 retryable: false,
2380 details: Some(serde_json::json!({ "target": target })),
2381 },
2382 DistError::ResolutionUnavailable { reference } => Self {
2383 code: IntegrationErrorCode::ResolutionUnavailable,
2384 summary: format!("resolution unavailable for `{reference}`"),
2385 retryable: false,
2386 details: Some(serde_json::json!({ "reference": reference })),
2387 },
2388 DistError::Network(summary) => Self {
2389 code: IntegrationErrorCode::DownloadFailed,
2390 summary,
2391 retryable: true,
2392 details: None,
2393 },
2394 DistError::CorruptArtifact { reference, reason } => Self {
2395 code: IntegrationErrorCode::CacheCorrupt,
2396 summary: format!("corrupt artifact `{reference}`: {reason}"),
2397 retryable: false,
2398 details: Some(serde_json::json!({ "reference": reference, "reason": reason })),
2399 },
2400 DistError::UnsupportedAbi { abi } => Self {
2401 code: IntegrationErrorCode::UnsupportedArtifactType,
2402 summary: format!("unsupported abi `{abi}`"),
2403 retryable: false,
2404 details: Some(serde_json::json!({ "abi": abi })),
2405 },
2406 DistError::CacheError { path, source } => Self {
2407 code: IntegrationErrorCode::BundleOpenFailed,
2408 summary: format!("cache error at `{path}`: {source}"),
2409 retryable: true,
2410 details: Some(serde_json::json!({ "path": path })),
2411 },
2412 DistError::InvalidInput(summary) => Self {
2413 code: IntegrationErrorCode::PolicyInputInvalid,
2414 summary,
2415 retryable: false,
2416 details: None,
2417 },
2418 DistError::InsecureUrl { url } => Self {
2419 code: IntegrationErrorCode::UnsupportedSource,
2420 summary: format!("insecure url `{url}`: only https is allowed"),
2421 retryable: false,
2422 details: Some(serde_json::json!({ "url": url })),
2423 },
2424 DistError::Offline { reference } => Self {
2425 code: IntegrationErrorCode::OfflineRequiredButUnavailable,
2426 summary: format!("offline mode forbids fetching `{reference}`"),
2427 retryable: true,
2428 details: Some(serde_json::json!({ "reference": reference })),
2429 },
2430 DistError::StoreAuth(summary) => Self {
2431 code: IntegrationErrorCode::ResolutionFailed,
2432 summary,
2433 retryable: false,
2434 details: None,
2435 },
2436 DistError::Oci(source) => Self {
2437 code: IntegrationErrorCode::ResolutionFailed,
2438 summary: source.to_string(),
2439 retryable: true,
2440 details: None,
2441 },
2442 DistError::Serde(source) => Self {
2443 code: IntegrationErrorCode::DescriptorCorrupt,
2444 summary: format!("invalid serialized input: {source}"),
2445 retryable: false,
2446 details: None,
2447 },
2448 }
2449 }
2450
2451 fn from_verification_report(report: VerificationReport) -> Self {
2452 let code = verification_failure_code(&report);
2453 let summary = report
2454 .errors
2455 .first()
2456 .cloned()
2457 .or_else(|| report.warnings.first().cloned())
2458 .unwrap_or_else(|| "verification failed".to_string());
2459 Self {
2460 code,
2461 summary,
2462 retryable: false,
2463 details: Some(serde_json::json!({
2464 "artifact_digest": report.artifact_digest,
2465 "canonical_ref": report.canonical_ref,
2466 "errors": report.errors,
2467 "warnings": report.warnings,
2468 "failed_checks": report
2469 .checks
2470 .iter()
2471 .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2472 .map(|check| serde_json::json!({
2473 "name": check.name,
2474 "detail": check.detail,
2475 "payload": check.payload,
2476 }))
2477 .collect::<Vec<_>>(),
2478 })),
2479 }
2480 }
2481}
2482
2483impl ArtifactOpener for DefaultArtifactOpener {
2484 fn open(
2485 &self,
2486 artifact: &ResolvedArtifact,
2487 request: &ArtifactOpenRequest,
2488 ) -> Result<ArtifactOpenOutput, IntegrationError> {
2489 Ok(ArtifactOpenOutput {
2490 bundle_manifest_summary: manifest_summary_from_artifact(artifact),
2491 bundle_open_mode: if request.dry_run {
2492 BundleOpenMode::CacheReuse
2493 } else {
2494 BundleOpenMode::Userspace
2495 },
2496 warnings: if request.smoke_test {
2497 vec!["smoke test requested but no bundle runtime opener is configured".to_string()]
2498 } else {
2499 Vec::new()
2500 },
2501 })
2502 }
2503}
2504
2505pub type ResolveError = DistError;
2506
2507#[derive(Clone, Debug)]
2508struct ComponentCache {
2509 base: PathBuf,
2510}
2511
2512impl ComponentCache {
2513 fn new(base: PathBuf) -> Self {
2514 Self { base }
2515 }
2516
2517 fn artifacts_root(&self) -> PathBuf {
2518 self.base.join("artifacts").join("sha256")
2519 }
2520
2521 fn bundle_records_root(&self) -> PathBuf {
2522 self.base.join("bundles")
2523 }
2524
2525 fn bundle_record_path(&self, bundle_id: &str) -> PathBuf {
2526 let safe = bundle_id.replace(':', "__");
2527 self.bundle_records_root().join(format!("{safe}.json"))
2528 }
2529
2530 fn component_dir(&self, digest: &str) -> PathBuf {
2531 let normalized = trim_digest_prefix(&normalize_digest(digest)).to_string();
2532 let (prefix, rest) = normalized.split_at(normalized.len().min(2));
2533 self.artifacts_root().join(prefix).join(rest)
2534 }
2535
2536 fn legacy_component_dir(&self, digest: &str) -> PathBuf {
2537 self.base
2538 .join(trim_digest_prefix(&normalize_digest(digest)))
2539 }
2540
2541 fn component_path(&self, digest: &str) -> PathBuf {
2542 self.component_dir(digest).join("blob")
2543 }
2544
2545 fn legacy_component_path(&self, digest: &str) -> PathBuf {
2546 self.legacy_component_dir(digest).join("component.wasm")
2547 }
2548
2549 fn entry_path(&self, digest: &str) -> PathBuf {
2550 self.component_dir(digest).join("entry.json")
2551 }
2552
2553 fn existing_component(&self, digest: &str) -> Option<PathBuf> {
2554 let path = self.component_path(digest);
2555 if path.exists() {
2556 let _ = self.touch_last_used(digest);
2557 Some(path)
2558 } else {
2559 let legacy = self.legacy_component_path(digest);
2560 if legacy.exists() {
2561 let _ = self.touch_last_used(digest);
2562 Some(legacy)
2563 } else {
2564 None
2565 }
2566 }
2567 }
2568
2569 fn write_component(&self, digest: &str, data: &[u8]) -> Result<PathBuf, std::io::Error> {
2570 let dir = self.component_dir(digest);
2571 fs::create_dir_all(&dir)?;
2572 let path = dir.join("blob");
2573 fs::write(&path, data)?;
2574 self.touch_last_used(digest)?;
2575 Ok(path)
2576 }
2577
2578 fn write_entry(&self, entry: &CacheEntry) -> Result<(), std::io::Error> {
2579 let path = self.entry_path(&entry.digest);
2580 let bytes = serde_json::to_vec_pretty(entry)
2581 .map_err(|err| std::io::Error::other(err.to_string()))?;
2582 fs::write(path, bytes)
2583 }
2584
2585 fn read_entry(&self, digest: &str) -> Result<CacheEntry, std::io::Error> {
2586 let path = self.entry_path(digest);
2587 let bytes = fs::read(path)?;
2588 serde_json::from_slice(&bytes).map_err(|err| std::io::Error::other(err.to_string()))
2589 }
2590
2591 fn write_bundle_record(&self, record: &BundleRecord) -> Result<(), std::io::Error> {
2592 let path = self.bundle_record_path(&record.bundle_id);
2593 if let Some(parent) = path.parent() {
2594 fs::create_dir_all(parent)?;
2595 }
2596 let bytes = serde_json::to_vec_pretty(record)
2597 .map_err(|err| std::io::Error::other(err.to_string()))?;
2598 fs::write(path, bytes)
2599 }
2600
2601 fn read_bundle_record(&self, bundle_id: &str) -> Result<BundleRecord, std::io::Error> {
2602 let path = self.bundle_record_path(bundle_id);
2603 let bytes = fs::read(path)?;
2604 serde_json::from_slice(&bytes).map_err(|err| std::io::Error::other(err.to_string()))
2605 }
2606
2607 fn remove_bundle_record(&self, bundle_id: &str) -> Result<(), std::io::Error> {
2608 let path = self.bundle_record_path(bundle_id);
2609 if path.exists() {
2610 fs::remove_file(path)
2611 } else {
2612 Ok(())
2613 }
2614 }
2615
2616 fn list_bundle_records(&self) -> Result<Vec<BundleRecord>, std::io::Error> {
2617 let root = self.bundle_records_root();
2618 let mut out = Vec::new();
2619 let Ok(entries) = fs::read_dir(root) else {
2620 return Ok(out);
2621 };
2622 for entry in entries.flatten() {
2623 let path = entry.path();
2624 if !path.is_file() {
2625 continue;
2626 }
2627 let bytes = fs::read(&path)?;
2628 let record = serde_json::from_slice::<BundleRecord>(&bytes)
2629 .map_err(|err| std::io::Error::other(format!("{}: {err}", path.display())))?;
2630 out.push(record);
2631 }
2632 out.sort_by(|a, b| a.bundle_id.cmp(&b.bundle_id));
2633 Ok(out)
2634 }
2635
2636 fn list_digests(&self) -> Vec<String> {
2637 let mut digests = Vec::new();
2638 self.collect_digests(&self.base, &mut digests);
2639 let root = self.artifacts_root();
2640 if root != self.base {
2641 self.collect_digests(&root, &mut digests);
2642 }
2643 digests.sort();
2644 digests.dedup();
2645 digests
2646 }
2647
2648 fn collect_digests(&self, dir: &Path, digests: &mut Vec<String>) {
2649 let _ = &self.base;
2650 if let Ok(entries) = fs::read_dir(dir) {
2651 for entry in entries.flatten() {
2652 let path = entry.path();
2653 if path.is_dir() {
2654 let blob = path.join("blob");
2655 let legacy_blob = path.join("component.wasm");
2656 if blob.exists() || legacy_blob.exists() {
2657 if let Some(digest) = digest_from_component_dir(&path) {
2658 digests.push(digest);
2659 }
2660 } else {
2661 self.collect_digests(&path, digests);
2662 }
2663 }
2664 }
2665 }
2666 }
2667
2668 fn touch_last_used(&self, digest: &str) -> Result<(), std::io::Error> {
2669 let marker = self.component_dir(digest).join("last_used");
2670 let seq = LAST_USED_COUNTER.fetch_add(1, Ordering::Relaxed);
2671 fs::write(marker, seq.to_string())
2672 }
2673}
2674
2675fn unix_now() -> u64 {
2676 SystemTime::now()
2677 .duration_since(UNIX_EPOCH)
2678 .unwrap_or_default()
2679 .as_secs()
2680}
2681
2682fn artifact_source_from_reference(
2683 reference: &str,
2684 opts: &DistOptions,
2685) -> Result<ArtifactSource, DistError> {
2686 let kind = match classify_reference(reference)? {
2687 RefKind::Digest(_) => ArtifactSourceKind::CacheDigest,
2688 RefKind::Http(_) => ArtifactSourceKind::Https,
2689 RefKind::File(_) => ArtifactSourceKind::File,
2690 RefKind::Oci(_) => ArtifactSourceKind::Oci,
2691 RefKind::Repo(_) => ArtifactSourceKind::Repo,
2692 RefKind::Store(_) => ArtifactSourceKind::Store,
2693 #[cfg(feature = "fixture-resolver")]
2694 RefKind::Fixture(_) => ArtifactSourceKind::Fixture,
2695 };
2696 Ok(ArtifactSource {
2697 raw_ref: reference.to_string(),
2698 kind: kind.clone(),
2699 transport_hints: TransportHints {
2700 offline: opts.offline,
2701 allow_insecure_local_http: opts.allow_insecure_local_http,
2702 },
2703 dev_mode: matches!(kind, ArtifactSourceKind::Fixture | ArtifactSourceKind::File),
2704 })
2705}
2706
2707fn canonical_oci_ref(reference: &str, digest: &str) -> String {
2708 let repo = canonical_oci_component_id(reference);
2709 format!("oci://{repo}@{digest}")
2710}
2711
2712fn digest_from_component_dir(dir: &Path) -> Option<String> {
2713 let leaf = dir.file_name()?.to_str()?;
2714 if leaf.len() == 64 && leaf.chars().all(|ch| ch.is_ascii_hexdigit()) {
2715 return Some(format!("sha256:{leaf}"));
2716 }
2717 let parent = dir.parent()?.file_name()?.to_str()?;
2718 if parent.len() == 2 && leaf.chars().all(|ch| ch.is_ascii_hexdigit()) {
2719 Some(format!("sha256:{parent}{leaf}"))
2720 } else {
2721 None
2722 }
2723}
2724
2725fn cache_entry_state_from_integrity(state: &IntegrityState) -> CacheEntryState {
2726 match state {
2727 IntegrityState::Partial => CacheEntryState::Partial,
2728 IntegrityState::Ready => CacheEntryState::Ready,
2729 IntegrityState::Corrupt => CacheEntryState::Corrupt,
2730 IntegrityState::Evicted => CacheEntryState::Evicted,
2731 }
2732}
2733
2734fn integrity_state_from_entry(state: &CacheEntryState) -> IntegrityState {
2735 match state {
2736 CacheEntryState::Partial => IntegrityState::Partial,
2737 CacheEntryState::Ready => IntegrityState::Ready,
2738 CacheEntryState::Corrupt => IntegrityState::Corrupt,
2739 CacheEntryState::Evicted => IntegrityState::Evicted,
2740 }
2741}
2742
2743fn descriptor_from_entry(entry: &CacheEntry) -> ArtifactDescriptor {
2744 ArtifactDescriptor {
2745 artifact_type: entry.artifact_type.clone(),
2746 source_kind: entry.source_kind.clone(),
2747 raw_ref: entry.raw_ref.clone(),
2748 canonical_ref: entry.canonical_ref.clone(),
2749 digest: entry.digest.clone(),
2750 media_type: entry.media_type.clone(),
2751 size_bytes: entry.size_bytes,
2752 created_at: None,
2753 annotations: serde_json::Map::new(),
2754 manifest_digest: None,
2755 resolved_via: match entry.source_kind {
2756 ArtifactSourceKind::Repo => ResolvedVia::RepoMapping,
2757 ArtifactSourceKind::Store => ResolvedVia::StoreMapping,
2758 ArtifactSourceKind::Fixture => ResolvedVia::Fixture,
2759 ArtifactSourceKind::File => ResolvedVia::File,
2760 ArtifactSourceKind::Https => ResolvedVia::Https,
2761 ArtifactSourceKind::CacheDigest => ResolvedVia::CacheDigest,
2762 ArtifactSourceKind::Oci => ResolvedVia::Direct,
2763 },
2764 signature_refs: Vec::new(),
2765 sbom_refs: Vec::new(),
2766 }
2767}
2768
2769fn legacy_source_from_entry(entry: &CacheEntry) -> LegacyArtifactSource {
2770 match entry.source_kind {
2771 ArtifactSourceKind::CacheDigest => LegacyArtifactSource::Digest,
2772 ArtifactSourceKind::Https => LegacyArtifactSource::Http(entry.raw_ref.clone()),
2773 ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
2774 LegacyArtifactSource::File(PathBuf::from(entry.raw_ref.clone()))
2775 }
2776 ArtifactSourceKind::Oci => {
2777 LegacyArtifactSource::Oci(entry.canonical_ref.trim_start_matches("oci://").to_string())
2778 }
2779 ArtifactSourceKind::Repo => LegacyArtifactSource::Repo(entry.raw_ref.clone()),
2780 ArtifactSourceKind::Store => LegacyArtifactSource::Store(entry.raw_ref.clone()),
2781 }
2782}
2783
2784fn component_id_from_descriptor(entry: &CacheEntry) -> String {
2785 component_id_from_ref(&match entry.source_kind {
2786 ArtifactSourceKind::CacheDigest => RefKind::Digest(entry.digest.clone()),
2787 ArtifactSourceKind::Https => RefKind::Http(entry.raw_ref.clone()),
2788 ArtifactSourceKind::File | ArtifactSourceKind::Fixture => {
2789 RefKind::File(PathBuf::from(entry.raw_ref.clone()))
2790 }
2791 ArtifactSourceKind::Oci => {
2792 RefKind::Oci(entry.canonical_ref.trim_start_matches("oci://").to_string())
2793 }
2794 ArtifactSourceKind::Repo => RefKind::Repo(entry.raw_ref.clone()),
2795 ArtifactSourceKind::Store => RefKind::Store(entry.raw_ref.clone()),
2796 })
2797}
2798
2799fn verification_outcome_name(outcome: &VerificationOutcome) -> &'static str {
2800 match outcome {
2801 VerificationOutcome::Passed => "passed",
2802 VerificationOutcome::Failed => "failed",
2803 VerificationOutcome::Warning => "warning",
2804 VerificationOutcome::Skipped => "skipped",
2805 }
2806}
2807
2808fn make_check(
2809 name: &str,
2810 outcome: VerificationOutcome,
2811 detail: impl Into<String>,
2812 payload: Option<serde_json::Value>,
2813) -> VerificationCheck {
2814 VerificationCheck {
2815 name: name.to_string(),
2816 outcome,
2817 detail: detail.into(),
2818 payload,
2819 }
2820}
2821
2822fn preliminary_decision_from_checks(checks: Vec<VerificationCheck>) -> PreliminaryDecision {
2823 let warnings = checks
2824 .iter()
2825 .filter(|check| matches!(check.outcome, VerificationOutcome::Warning))
2826 .map(|check| check.detail.clone())
2827 .collect::<Vec<_>>();
2828 let errors = checks
2829 .iter()
2830 .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2831 .map(|check| check.detail.clone())
2832 .collect::<Vec<_>>();
2833 PreliminaryDecision {
2834 passed: errors.is_empty(),
2835 checks,
2836 warnings,
2837 errors,
2838 }
2839}
2840
2841fn verification_report_from_checks(
2842 descriptor: &ArtifactDescriptor,
2843 advisory_set: Option<&AdvisorySet>,
2844 verification_policy: &VerificationPolicy,
2845 cache_entry: Option<&CacheEntry>,
2846 checks: Vec<VerificationCheck>,
2847) -> VerificationReport {
2848 let warnings = checks
2849 .iter()
2850 .filter(|check| matches!(check.outcome, VerificationOutcome::Warning))
2851 .map(|check| check.detail.clone())
2852 .collect::<Vec<_>>();
2853 let errors = checks
2854 .iter()
2855 .filter(|check| matches!(check.outcome, VerificationOutcome::Failed))
2856 .map(|check| check.detail.clone())
2857 .collect::<Vec<_>>();
2858
2859 VerificationReport {
2860 artifact_digest: descriptor.digest.clone(),
2861 canonical_ref: descriptor.canonical_ref.clone(),
2862 passed: errors.is_empty(),
2863 warnings,
2864 errors,
2865 policy_fingerprint: policy_fingerprint(verification_policy),
2866 advisory_version: advisory_set.map(|advisory| advisory.version.clone()),
2867 cache_entry_fingerprint: cache_entry.map(cache_entry_fingerprint),
2868 checks,
2869 }
2870}
2871
2872fn policy_fingerprint(policy: &VerificationPolicy) -> String {
2873 let bytes = serde_json::to_vec(policy).unwrap_or_default();
2874 digest_for_bytes(&bytes)
2875}
2876
2877fn cache_entry_fingerprint(entry: &CacheEntry) -> String {
2878 let bytes = serde_json::to_vec(entry).unwrap_or_default();
2879 digest_for_bytes(&bytes)
2880}
2881
2882fn issuer_from_descriptor(descriptor: &ArtifactDescriptor) -> Option<String> {
2883 descriptor
2884 .annotations
2885 .get("issuer")
2886 .and_then(|value| value.as_str())
2887 .map(|raw| raw.to_string())
2888}
2889
2890fn minimum_operator_version_from_descriptor(descriptor: &ArtifactDescriptor) -> Option<String> {
2891 descriptor
2892 .annotations
2893 .get("minimum_operator_version")
2894 .and_then(|value| value.as_str())
2895 .map(|raw| raw.to_string())
2896}
2897
2898fn check_digest_allowed(
2899 digest: &str,
2900 advisory_set: Option<&AdvisorySet>,
2901 verification_policy: &VerificationPolicy,
2902) -> VerificationCheck {
2903 let denied = verification_policy
2904 .deny_digests
2905 .iter()
2906 .chain(
2907 advisory_set
2908 .into_iter()
2909 .flat_map(|advisory| advisory.deny_digests.iter()),
2910 )
2911 .any(|candidate| candidate == digest);
2912 if denied {
2913 make_check(
2914 "digest_allowed",
2915 VerificationOutcome::Failed,
2916 format!("digest {digest} is denied by policy or advisory"),
2917 Some(serde_json::json!({
2918 "digest": digest,
2919 "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
2920 })),
2921 )
2922 } else {
2923 make_check(
2924 "digest_allowed",
2925 VerificationOutcome::Passed,
2926 format!("digest {digest} is allowed"),
2927 Some(serde_json::json!({
2928 "digest": digest,
2929 })),
2930 )
2931 }
2932}
2933
2934fn check_media_type_allowed(
2935 media_type: &str,
2936 verification_policy: &VerificationPolicy,
2937) -> VerificationCheck {
2938 if verification_policy.allowed_media_types.is_empty() {
2939 return make_check(
2940 "media_type_allowed",
2941 VerificationOutcome::Skipped,
2942 "no media type allowlist configured",
2943 Some(serde_json::json!({
2944 "media_type": media_type,
2945 })),
2946 );
2947 }
2948 if verification_policy
2949 .allowed_media_types
2950 .iter()
2951 .any(|candidate| candidate == media_type)
2952 {
2953 make_check(
2954 "media_type_allowed",
2955 VerificationOutcome::Passed,
2956 format!("media type {media_type} is allowed"),
2957 Some(serde_json::json!({
2958 "media_type": media_type,
2959 })),
2960 )
2961 } else {
2962 make_check(
2963 "media_type_allowed",
2964 VerificationOutcome::Failed,
2965 format!("media type {media_type} is not allowed"),
2966 Some(serde_json::json!({
2967 "media_type": media_type,
2968 "allowed_media_types": verification_policy.allowed_media_types,
2969 })),
2970 )
2971 }
2972}
2973
2974fn check_issuer_allowed(
2975 issuer: Option<String>,
2976 advisory_set: Option<&AdvisorySet>,
2977 verification_policy: &VerificationPolicy,
2978) -> VerificationCheck {
2979 let Some(issuer) = issuer else {
2980 return make_check(
2981 "issuer_allowed",
2982 VerificationOutcome::Warning,
2983 "issuer metadata is missing",
2984 Some(serde_json::json!({
2985 "issuer": null,
2986 "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
2987 })),
2988 );
2989 };
2990 if verification_policy
2991 .deny_issuers
2992 .iter()
2993 .chain(
2994 advisory_set
2995 .into_iter()
2996 .flat_map(|advisory| advisory.deny_issuers.iter()),
2997 )
2998 .any(|candidate| candidate == &issuer)
2999 {
3000 return make_check(
3001 "issuer_allowed",
3002 VerificationOutcome::Failed,
3003 format!("issuer {issuer} is denied"),
3004 Some(serde_json::json!({
3005 "issuer": issuer,
3006 "advisory_version": advisory_set.map(|advisory| advisory.version.clone()),
3007 })),
3008 );
3009 }
3010 if !verification_policy.trusted_issuers.is_empty()
3011 && !verification_policy
3012 .trusted_issuers
3013 .iter()
3014 .any(|candidate| candidate == &issuer)
3015 {
3016 return make_check(
3017 "issuer_allowed",
3018 VerificationOutcome::Warning,
3019 format!("issuer {issuer} is not on the trusted issuer allowlist"),
3020 Some(serde_json::json!({
3021 "issuer": issuer,
3022 "trusted_issuers": verification_policy.trusted_issuers,
3023 })),
3024 );
3025 }
3026 make_check(
3027 "issuer_allowed",
3028 VerificationOutcome::Passed,
3029 format!("issuer {issuer} is allowed"),
3030 Some(serde_json::json!({
3031 "issuer": issuer,
3032 })),
3033 )
3034}
3035
3036fn check_operator_version_compatible(
3037 descriptor: &ArtifactDescriptor,
3038 advisory_set: Option<&AdvisorySet>,
3039 verification_policy: &VerificationPolicy,
3040) -> VerificationCheck {
3041 let required = verification_policy
3042 .minimum_operator_version
3043 .clone()
3044 .or_else(|| advisory_set.and_then(|advisory| advisory.minimum_operator_version.clone()))
3045 .or_else(|| minimum_operator_version_from_descriptor(descriptor));
3046 let Some(required) = required else {
3047 return make_check(
3048 "operator_version_compatible",
3049 VerificationOutcome::Skipped,
3050 "no minimum operator version requirement present",
3051 Some(serde_json::json!({
3052 "required": null,
3053 "actual": descriptor
3054 .annotations
3055 .get("operator_version")
3056 .and_then(|value| value.as_str()),
3057 })),
3058 );
3059 };
3060
3061 let actual = descriptor
3062 .annotations
3063 .get("operator_version")
3064 .and_then(|value| value.as_str())
3065 .map(|raw| raw.to_string());
3066 let Some(actual) = actual else {
3067 return make_check(
3068 "operator_version_compatible",
3069 VerificationOutcome::Warning,
3070 format!(
3071 "minimum operator version {required} is declared but actual operator version is unknown"
3072 ),
3073 Some(serde_json::json!({
3074 "required": required,
3075 "actual": null,
3076 })),
3077 );
3078 };
3079
3080 let required_parsed = semver::Version::parse(&required);
3081 let actual_parsed = semver::Version::parse(&actual);
3082 match (required_parsed, actual_parsed) {
3083 (Ok(required), Ok(actual)) if actual >= required => make_check(
3084 "operator_version_compatible",
3085 VerificationOutcome::Passed,
3086 format!("operator version {actual} satisfies minimum {required}"),
3087 Some(serde_json::json!({
3088 "required": required.to_string(),
3089 "actual": actual.to_string(),
3090 })),
3091 ),
3092 (Ok(required), Ok(actual)) => {
3093 let outcome = match verification_policy.environment {
3094 VerificationEnvironment::Dev => VerificationOutcome::Warning,
3095 VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3096 VerificationOutcome::Failed
3097 }
3098 };
3099 make_check(
3100 "operator_version_compatible",
3101 outcome,
3102 format!("operator version {actual} does not satisfy minimum {required}"),
3103 Some(serde_json::json!({
3104 "required": required.to_string(),
3105 "actual": actual.to_string(),
3106 "environment": verification_environment_name(&verification_policy.environment),
3107 })),
3108 )
3109 }
3110 _ => make_check(
3111 "operator_version_compatible",
3112 VerificationOutcome::Warning,
3113 format!(
3114 "operator version metadata is not parseable (actual={actual}, required={required})"
3115 ),
3116 Some(serde_json::json!({
3117 "required": required,
3118 "actual": actual,
3119 })),
3120 ),
3121 }
3122}
3123
3124fn check_content_digest_match(
3125 resolved_artifact: &ResolvedArtifact,
3126) -> Result<VerificationCheck, DistError> {
3127 let bytes = resolved_artifact.wasm_bytes()?;
3128 let computed = digest_for_bytes(bytes);
3129 Ok(if computed == resolved_artifact.descriptor.digest {
3130 make_check(
3131 "content_digest_match",
3132 VerificationOutcome::Passed,
3133 format!(
3134 "content digest matches {}",
3135 resolved_artifact.descriptor.digest
3136 ),
3137 None,
3138 )
3139 } else {
3140 make_check(
3141 "content_digest_match",
3142 VerificationOutcome::Failed,
3143 format!(
3144 "content digest {} did not match descriptor {}",
3145 computed, resolved_artifact.descriptor.digest
3146 ),
3147 Some(serde_json::json!({
3148 "expected": resolved_artifact.descriptor.digest,
3149 "actual": computed,
3150 })),
3151 )
3152 })
3153}
3154
3155fn check_signature_present(
3156 descriptor: &ArtifactDescriptor,
3157 verification_policy: &VerificationPolicy,
3158) -> VerificationCheck {
3159 if descriptor.signature_refs.is_empty() {
3160 let outcome = match verification_policy.environment {
3161 VerificationEnvironment::Dev => VerificationOutcome::Warning,
3162 VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3163 if verification_policy.require_signature {
3164 VerificationOutcome::Failed
3165 } else {
3166 VerificationOutcome::Warning
3167 }
3168 }
3169 };
3170 return make_check(
3171 "signature_present",
3172 outcome,
3173 "no signature references are present",
3174 Some(serde_json::json!({
3175 "count": 0,
3176 "required": verification_policy.require_signature,
3177 "environment": verification_environment_name(&verification_policy.environment),
3178 })),
3179 );
3180 }
3181 make_check(
3182 "signature_present",
3183 VerificationOutcome::Passed,
3184 "signature references are present",
3185 Some(serde_json::json!({
3186 "count": descriptor.signature_refs.len(),
3187 "required": verification_policy.require_signature,
3188 "environment": verification_environment_name(&verification_policy.environment),
3189 })),
3190 )
3191}
3192
3193fn check_signature_verified(
3194 descriptor: &ArtifactDescriptor,
3195 verification_policy: &VerificationPolicy,
3196) -> VerificationCheck {
3197 let detail = if descriptor.signature_refs.is_empty() {
3198 "signature verification could not run because no signature references are present"
3199 } else {
3200 "signature verification is not implemented in the open-source client"
3201 };
3202 let outcome = if verification_policy.require_signature {
3203 match verification_policy.environment {
3204 VerificationEnvironment::Dev => VerificationOutcome::Warning,
3205 VerificationEnvironment::Staging | VerificationEnvironment::Prod => {
3206 VerificationOutcome::Failed
3207 }
3208 }
3209 } else if descriptor.signature_refs.is_empty() {
3210 VerificationOutcome::Skipped
3211 } else {
3212 VerificationOutcome::Warning
3213 };
3214 make_check(
3215 "signature_verified",
3216 outcome,
3217 detail,
3218 Some(serde_json::json!({
3219 "implemented": false,
3220 "signature_count": descriptor.signature_refs.len(),
3221 "required": verification_policy.require_signature,
3222 "environment": verification_environment_name(&verification_policy.environment),
3223 })),
3224 )
3225}
3226
3227fn check_sbom_present(
3228 descriptor: &ArtifactDescriptor,
3229 verification_policy: &VerificationPolicy,
3230) -> VerificationCheck {
3231 if descriptor.sbom_refs.is_empty() {
3232 return make_check(
3233 "sbom_present",
3234 if verification_policy.require_sbom {
3235 VerificationOutcome::Failed
3236 } else {
3237 VerificationOutcome::Warning
3238 },
3239 "no SBOM references are present",
3240 Some(serde_json::json!({
3241 "count": 0,
3242 "required": verification_policy.require_sbom,
3243 })),
3244 );
3245 }
3246 make_check(
3247 "sbom_present",
3248 VerificationOutcome::Passed,
3249 "SBOM references are present",
3250 Some(serde_json::json!({
3251 "count": descriptor.sbom_refs.len(),
3252 "required": verification_policy.require_sbom,
3253 })),
3254 )
3255}
3256
3257fn verification_environment_name(environment: &VerificationEnvironment) -> &'static str {
3258 match environment {
3259 VerificationEnvironment::Dev => "dev",
3260 VerificationEnvironment::Staging => "staging",
3261 VerificationEnvironment::Prod => "prod",
3262 }
3263}
3264
3265fn bundle_id_for_digest(digest: &str) -> String {
3266 format!("bundle:{}", normalize_digest(digest))
3267}
3268
3269fn digest_from_bundle_id(bundle_id: &str) -> Option<String> {
3270 bundle_id
3271 .strip_prefix("bundle:")
3272 .map(normalize_digest)
3273 .filter(|digest| digest.starts_with("sha256:"))
3274}
3275
3276fn manifest_summary_from_artifact(artifact: &ResolvedArtifact) -> BundleManifestSummary {
3277 BundleManifestSummary {
3278 component_id: artifact.component_id.clone(),
3279 abi_version: artifact.abi_version.clone(),
3280 describe_artifact_ref: artifact.describe_artifact_ref.clone(),
3281 artifact_type: artifact.descriptor.artifact_type.clone(),
3282 media_type: artifact.descriptor.media_type.clone(),
3283 size_bytes: artifact.descriptor.size_bytes,
3284 }
3285}
3286
3287fn verification_failure_code(report: &VerificationReport) -> IntegrationErrorCode {
3288 for check in &report.checks {
3289 if !matches!(check.outcome, VerificationOutcome::Failed) {
3290 continue;
3291 }
3292 return match check.name.as_str() {
3293 "digest_allowed" => IntegrationErrorCode::DigestDenied,
3294 "media_type_allowed" => IntegrationErrorCode::MediaTypeRejected,
3295 "issuer_allowed" => IntegrationErrorCode::IssuerRejected,
3296 "content_digest_match" => IntegrationErrorCode::DigestMismatch,
3297 "signature_present" | "signature_verified" => IntegrationErrorCode::SignatureRequired,
3298 "operator_version_compatible" => IntegrationErrorCode::VerificationFailed,
3299 "sbom_present" => IntegrationErrorCode::VerificationFailed,
3300 _ => IntegrationErrorCode::VerificationFailed,
3301 };
3302 }
3303 IntegrationErrorCode::VerificationFailed
3304}
3305
3306fn retention_decisions(input: &RetentionInput) -> Vec<RetentionDecision> {
3307 let mut decisions = Vec::with_capacity(input.entries.len());
3308 let protected_active = input
3309 .active_bundle_ids
3310 .iter()
3311 .cloned()
3312 .collect::<std::collections::BTreeSet<_>>();
3313 let protected_staged = input
3314 .staged_bundle_ids
3315 .iter()
3316 .cloned()
3317 .collect::<std::collections::BTreeSet<_>>();
3318 let protected_warming = input
3319 .warming_bundle_ids
3320 .iter()
3321 .cloned()
3322 .collect::<std::collections::BTreeSet<_>>();
3323 let protected_ready = input
3324 .ready_bundle_ids
3325 .iter()
3326 .cloned()
3327 .collect::<std::collections::BTreeSet<_>>();
3328 let protected_draining = input
3329 .draining_bundle_ids
3330 .iter()
3331 .cloned()
3332 .collect::<std::collections::BTreeSet<_>>();
3333 let protected_session = input
3334 .session_referenced_bundle_ids
3335 .iter()
3336 .cloned()
3337 .collect::<std::collections::BTreeSet<_>>();
3338 let protected_rollback = rollback_protected_bundle_ids(
3339 &input.active_bundle_ids,
3340 &input.staged_bundle_ids,
3341 &input.entries,
3342 input.minimum_rollback_depth,
3343 );
3344
3345 let now = unix_now();
3346 let mut candidate_indices = Vec::new();
3347 let mut total_bytes = input
3348 .entries
3349 .iter()
3350 .map(|entry| entry.size_bytes)
3351 .sum::<u64>();
3352
3353 for (index, entry) in input.entries.iter().enumerate() {
3354 let bundle_id = bundle_id_for_digest(&entry.digest);
3355 let protection = if protected_active.contains(&bundle_id) {
3356 Some(("active_bundle", "bundle is currently active".to_string()))
3357 } else if protected_session.contains(&bundle_id) {
3358 Some((
3359 "session_reference",
3360 "bundle is referenced by a live session".to_string(),
3361 ))
3362 } else if protected_warming.contains(&bundle_id) {
3363 Some(("warming_bundle", "bundle is currently warming".to_string()))
3364 } else if protected_ready.contains(&bundle_id) {
3365 Some(("ready_bundle", "bundle is currently ready".to_string()))
3366 } else if protected_draining.contains(&bundle_id) {
3367 Some((
3368 "draining_bundle",
3369 "bundle is currently draining".to_string(),
3370 ))
3371 } else if protected_staged.contains(&bundle_id) {
3372 Some(("staged_bundle", "bundle is currently staged".to_string()))
3373 } else if protected_rollback.contains(&bundle_id) {
3374 Some((
3375 "rollback_depth",
3376 "bundle is protected for rollback depth".to_string(),
3377 ))
3378 } else {
3379 None
3380 };
3381
3382 if let Some((reason_code, reason_detail)) = protection {
3383 decisions.push(RetentionDecision {
3384 cache_key: entry.cache_key.clone(),
3385 bundle_id,
3386 decision: RetentionDisposition::Protect,
3387 reason_code: reason_code.to_string(),
3388 reason_detail,
3389 });
3390 continue;
3391 }
3392
3393 decisions.push(RetentionDecision {
3394 cache_key: entry.cache_key.clone(),
3395 bundle_id,
3396 decision: RetentionDisposition::Keep,
3397 reason_code: "within_policy".to_string(),
3398 reason_detail: "entry remains available".to_string(),
3399 });
3400 candidate_indices.push(index);
3401 }
3402
3403 let mut candidate_order = candidate_indices
3404 .into_iter()
3405 .map(|index| {
3406 let entry = &input.entries[index];
3407 let is_corrupt = matches!(entry.state, CacheEntryState::Corrupt);
3408 let rollback_eligible =
3409 protected_rollback.contains(&bundle_id_for_digest(&entry.digest));
3410 let age_secs = now.saturating_sub(entry.last_accessed_at);
3411 (index, is_corrupt, rollback_eligible, age_secs)
3412 })
3413 .collect::<Vec<_>>();
3414 candidate_order.sort_by(|a, b| {
3415 b.1.cmp(&a.1)
3416 .then_with(|| a.2.cmp(&b.2))
3417 .then_with(|| b.3.cmp(&a.3))
3418 .then_with(|| {
3419 input.entries[a.0]
3420 .cache_key
3421 .cmp(&input.entries[b.0].cache_key)
3422 })
3423 });
3424
3425 for (index, is_corrupt, _, age_secs) in candidate_order {
3426 let entry = &input.entries[index];
3427 let aged_out = input
3428 .max_entry_age
3429 .is_some_and(|max_age| age_secs > max_age);
3430 let over_budget = input.max_cache_bytes > 0 && total_bytes > input.max_cache_bytes;
3431 if !is_corrupt && !aged_out && !over_budget {
3432 continue;
3433 }
3434
3435 let decision = &mut decisions[index];
3436 decision.decision = RetentionDisposition::Evict;
3437 if is_corrupt {
3438 decision.reason_code = "corrupt_entry".to_string();
3439 decision.reason_detail = "corrupt entry can be evicted safely".to_string();
3440 } else if aged_out {
3441 decision.reason_code = "max_age_exceeded".to_string();
3442 decision.reason_detail = "entry exceeded retention age".to_string();
3443 } else {
3444 decision.reason_code = "cache_budget".to_string();
3445 decision.reason_detail =
3446 "entry selected for eviction under cache budget pressure".to_string();
3447 }
3448 total_bytes = total_bytes.saturating_sub(entry.size_bytes);
3449 }
3450
3451 decisions
3452}
3453
3454fn rollback_protected_bundle_ids(
3455 active_bundle_ids: &[String],
3456 staged_bundle_ids: &[String],
3457 entries: &[CacheEntry],
3458 minimum_rollback_depth: usize,
3459) -> std::collections::BTreeSet<String> {
3460 if minimum_rollback_depth == 0 {
3461 return std::collections::BTreeSet::new();
3462 }
3463 let active = active_bundle_ids
3464 .iter()
3465 .cloned()
3466 .collect::<std::collections::BTreeSet<_>>();
3467 let mut protected = std::collections::BTreeSet::new();
3468 let ordered_bundle_ids = if staged_bundle_ids.is_empty() {
3469 let mut entries = entries.iter().collect::<Vec<_>>();
3470 entries.sort_by(|a, b| {
3471 a.fetched_at
3472 .cmp(&b.fetched_at)
3473 .then_with(|| a.cache_key.cmp(&b.cache_key))
3474 });
3475 entries
3476 .into_iter()
3477 .map(|entry| bundle_id_for_digest(&entry.digest))
3478 .collect::<Vec<_>>()
3479 } else {
3480 staged_bundle_ids.to_vec()
3481 };
3482 for bundle_id in ordered_bundle_ids.iter().rev() {
3483 if active.contains(bundle_id) {
3484 continue;
3485 }
3486 protected.insert(bundle_id.clone());
3487 if protected.len() >= minimum_rollback_depth {
3488 break;
3489 }
3490 }
3491 protected
3492}
3493
3494fn digest_for_bytes(bytes: &[u8]) -> String {
3495 let mut hasher = Sha256::new();
3496 hasher.update(bytes);
3497 format!("sha256:{:x}", hasher.finalize())
3498}
3499
3500fn trim_digest_prefix(digest: &str) -> &str {
3501 digest
3502 .strip_prefix("sha256:")
3503 .unwrap_or_else(|| digest.trim_start_matches('@'))
3504}
3505
3506fn normalize_digest(digest: &str) -> String {
3507 if digest.starts_with("sha256:") {
3508 digest.to_string()
3509 } else {
3510 format!("sha256:{digest}")
3511 }
3512}
3513
3514fn component_id_from_ref(kind: &RefKind) -> String {
3515 match kind {
3516 RefKind::Digest(digest) => digest.clone(),
3517 RefKind::Http(url) => Url::parse(url)
3518 .ok()
3519 .and_then(|u| {
3520 Path::new(u.path())
3521 .file_stem()
3522 .and_then(|stem| stem.to_str())
3523 .map(strip_file_component_suffix)
3524 })
3525 .filter(|id| !id.is_empty())
3526 .unwrap_or_else(|| "http-component".to_string()),
3527 RefKind::File(path) => path
3528 .file_stem()
3529 .and_then(|stem| stem.to_str())
3530 .map(strip_file_component_suffix)
3531 .filter(|id| !id.is_empty())
3532 .unwrap_or_else(|| "file-component".to_string()),
3533 RefKind::Oci(reference) => canonical_oci_component_id(reference),
3534 RefKind::Repo(reference) => reference.trim_start_matches("repo://").to_string(),
3535 RefKind::Store(reference) => reference.trim_start_matches("store://").to_string(),
3536 #[cfg(feature = "fixture-resolver")]
3537 RefKind::Fixture(reference) => reference.trim_start_matches('/').to_string(),
3538 }
3539}
3540
3541fn canonical_oci_component_id(reference: &str) -> String {
3542 let raw = reference.trim_start_matches("oci://");
3543 if let Ok(parsed) = Reference::try_from(raw) {
3544 return parsed.repository().to_string();
3545 }
3546 let without_digest = raw.split('@').next().unwrap_or(raw);
3547 let last_colon = without_digest.rfind(':');
3548 let last_slash = without_digest.rfind('/');
3549 if let (Some(colon), Some(slash)) = (last_colon, last_slash)
3550 && colon > slash
3551 {
3552 return without_digest[..colon].to_string();
3553 }
3554 without_digest.to_string()
3555}
3556
3557fn strip_file_component_suffix(input: &str) -> String {
3558 if let Some((prefix, suffix)) = input.rsplit_once("__")
3559 && !prefix.is_empty()
3560 && suffix.chars().all(|ch| ch.is_ascii_digit() || ch == '_')
3561 && suffix.contains('_')
3562 {
3563 return prefix.to_string();
3564 }
3565 input.to_string()
3566}
3567
3568fn normalize_content_type(current: Option<&str>, fallback: &str) -> String {
3569 let value = current.unwrap_or("").trim();
3570 if value.is_empty() {
3571 fallback.to_string()
3572 } else {
3573 value.to_string()
3574 }
3575}
3576
3577fn file_size_if_exists(path: &Path) -> Option<u64> {
3578 path.metadata().ok().map(|m| m.len())
3579}
3580
3581fn source_sidecar_describe_ref(source_wasm_path: &Path) -> Option<String> {
3582 let parent = source_wasm_path.parent()?;
3583 let describe = parent.join("describe.cbor");
3584 if !describe.exists() {
3585 return None;
3586 }
3587 Some(describe.display().to_string())
3588}
3589
3590fn resolve_component_id_from_cache(wasm_path: &Path, fallback: &str) -> String {
3591 let cache_dir = match wasm_path.parent() {
3592 Some(dir) => dir,
3593 None => return fallback.to_string(),
3594 };
3595 read_component_id_from_json(cache_dir.join("metadata.json"))
3596 .or_else(|| read_component_id_from_json(cache_dir.join("component.manifest.json")))
3597 .unwrap_or_else(|| fallback.to_string())
3598}
3599
3600fn read_component_id_from_json(path: PathBuf) -> Option<String> {
3601 let bytes = fs::read(path).ok()?;
3602 let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3603 extract_string_anywhere(
3604 &value,
3605 &["component_id", "componentId", "canonical_component_id"],
3606 )
3607 .filter(|id| !id.trim().is_empty())
3608}
3609
3610fn resolve_abi_version_from_cache(wasm_path: &Path) -> Option<String> {
3611 let cache_dir = wasm_path.parent()?;
3612 read_abi_version_from_json(cache_dir.join("metadata.json"))
3613 .or_else(|| read_abi_version_from_json(cache_dir.join("component.manifest.json")))
3614}
3615
3616fn resolve_describe_artifact_ref_from_cache(wasm_path: &Path) -> Option<String> {
3617 let cache_dir = wasm_path.parent()?;
3618 read_describe_artifact_ref_from_json(cache_dir.join("metadata.json"))
3619 .or_else(|| read_describe_artifact_ref_from_json(cache_dir.join("component.manifest.json")))
3620 .or_else(|| {
3621 let describe = cache_dir.join("describe.cbor");
3622 if describe.exists() {
3623 Some(describe.display().to_string())
3624 } else {
3625 None
3626 }
3627 })
3628}
3629
3630fn read_describe_artifact_ref_from_json(path: PathBuf) -> Option<String> {
3631 let bytes = fs::read(path).ok()?;
3632 let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3633 let found = extract_string_anywhere(
3634 &value,
3635 &[
3636 "describe_artifact_ref",
3637 "describeArtifactRef",
3638 "describe_ref",
3639 "describeRef",
3640 ],
3641 )?;
3642 let trimmed = found.trim();
3643 if trimmed.is_empty() {
3644 None
3645 } else {
3646 Some(trimmed.to_string())
3647 }
3648}
3649
3650fn read_abi_version_from_json(path: PathBuf) -> Option<String> {
3651 let bytes = fs::read(path).ok()?;
3652 let value: serde_json::Value = serde_json::from_slice(&bytes).ok()?;
3653 let found = extract_string_anywhere(&value, &["abi_version", "abiVersion", "abi"])?;
3654 normalize_abi_version(&found)
3655}
3656
3657fn normalize_abi_version(input: &str) -> Option<String> {
3658 let candidate = input.trim();
3659 if candidate.is_empty() {
3660 return None;
3661 }
3662 match semver::Version::parse(candidate) {
3663 Ok(version) => Some(version.to_string()),
3664 Err(_) => None,
3665 }
3666}
3667
3668fn extract_string_anywhere(value: &serde_json::Value, keys: &[&str]) -> Option<String> {
3669 match value {
3670 serde_json::Value::Object(map) => {
3671 for key in keys {
3672 if let Some(serde_json::Value::String(found)) = map.get(*key) {
3673 return Some(found.clone());
3674 }
3675 }
3676 for nested in map.values() {
3677 if let Some(found) = extract_string_anywhere(nested, keys) {
3678 return Some(found);
3679 }
3680 }
3681 None
3682 }
3683 serde_json::Value::Array(items) => {
3684 for item in items {
3685 if let Some(found) = extract_string_anywhere(item, keys) {
3686 return Some(found);
3687 }
3688 }
3689 None
3690 }
3691 _ => None,
3692 }
3693}
3694
3695fn map_registry_target(target: &str, base: Option<&str>) -> Option<String> {
3696 if Reference::try_from(target).is_ok() {
3697 return Some(target.to_string());
3698 }
3699 let base = base?;
3700 let normalized_base = base.trim_end_matches('/');
3701 let normalized_target = target.trim_start_matches('/');
3702 Some(format!("{normalized_base}/{normalized_target}"))
3703}
3704
3705#[derive(Clone, Debug, PartialEq, Eq)]
3706struct GreenticBizStoreTarget {
3707 tenant: String,
3708 mapped_reference: String,
3709}
3710
3711fn is_greentic_biz_store_target(target: &str) -> bool {
3712 target == "greentic-biz" || target.starts_with("greentic-biz/")
3713}
3714
3715fn parse_greentic_biz_store_target(target: &str) -> Result<GreenticBizStoreTarget, DistError> {
3716 let remainder = target.strip_prefix("greentic-biz/").ok_or_else(|| {
3717 DistError::InvalidInput(
3718 "store://greentic-biz refs must include `<tenant>/<package-path>`".into(),
3719 )
3720 })?;
3721 let (tenant, package_path) = remainder.split_once('/').ok_or_else(|| {
3722 DistError::InvalidInput(
3723 "store://greentic-biz refs must include `<tenant>/<package-path>`".into(),
3724 )
3725 })?;
3726 let tenant = tenant.trim();
3727 let package_path = package_path.trim_matches('/');
3728 if tenant.is_empty() || package_path.is_empty() {
3729 return Err(DistError::InvalidInput(
3730 "store://greentic-biz refs must include non-empty `<tenant>/<package-path>`".into(),
3731 ));
3732 }
3733 Ok(GreenticBizStoreTarget {
3734 tenant: tenant.to_string(),
3735 mapped_reference: format!("ghcr.io/greentic-biz/{package_path}"),
3736 })
3737}
3738
3739enum RefKind {
3740 Digest(String),
3741 Http(String),
3742 File(PathBuf),
3743 Oci(String),
3744 Repo(String),
3745 Store(String),
3746 #[cfg(feature = "fixture-resolver")]
3747 Fixture(String),
3748}
3749
3750fn classify_reference(input: &str) -> Result<RefKind, DistError> {
3751 if is_digest(input) {
3752 return Ok(RefKind::Digest(normalize_digest(input)));
3753 }
3754 if let Ok(url) = Url::parse(input) {
3755 match url.scheme() {
3756 "http" | "https" => return Ok(RefKind::Http(input.to_string())),
3757 "file" => {
3758 if let Ok(path) = url.to_file_path() {
3759 return Ok(RefKind::File(path));
3760 }
3761 }
3762 "oci" => {
3763 let trimmed = input.trim_start_matches("oci://");
3764 return Ok(RefKind::Oci(trimmed.to_string()));
3765 }
3766 "repo" => {
3767 let trimmed = input.trim_start_matches("repo://");
3768 return Ok(RefKind::Repo(trimmed.to_string()));
3769 }
3770 "store" => {
3771 let trimmed = input.trim_start_matches("store://");
3772 return Ok(RefKind::Store(trimmed.to_string()));
3773 }
3774 #[cfg(feature = "fixture-resolver")]
3775 "fixture" => {
3776 let trimmed = input.trim_start_matches("fixture://");
3777 return Ok(RefKind::Fixture(trimmed.to_string()));
3778 }
3779 _ => {}
3780 }
3781 }
3782 let path = Path::new(input);
3783 if path.exists() {
3784 return Ok(RefKind::File(path.to_path_buf()));
3785 }
3786 if Reference::try_from(input).is_ok() {
3787 Ok(RefKind::Oci(input.to_string()))
3788 } else {
3789 Err(DistError::InvalidRef {
3790 reference: input.to_string(),
3791 })
3792 }
3793}
3794
3795fn is_digest(s: &str) -> bool {
3796 s.starts_with("sha256:") && s.len() == "sha256:".len() + 64
3797}
3798
3799fn is_loopback_http(url: &Url) -> bool {
3800 url.scheme() == "http" && matches!(url.host_str(), Some("localhost") | Some("127.0.0.1"))
3801}
3802
3803fn ensure_secure_http_url(url: &str, allow_loopback_local: bool) -> Result<Url, DistError> {
3804 let parsed = Url::parse(url).map_err(|_| DistError::InvalidRef {
3805 reference: url.to_string(),
3806 })?;
3807 if parsed.scheme() == "https" || (allow_loopback_local && is_loopback_http(&parsed)) {
3808 Ok(parsed)
3809 } else {
3810 Err(DistError::InsecureUrl {
3811 url: url.to_string(),
3812 })
3813 }
3814}
3815
3816#[derive(Debug, serde::Deserialize)]
3817struct LockFile {
3818 #[serde(default)]
3819 schema_version: Option<u64>,
3820 #[serde(default)]
3821 components: Vec<LockEntry>,
3822}
3823
3824#[derive(Debug, serde::Deserialize)]
3825#[serde(untagged)]
3826enum LockEntry {
3827 String(String),
3828 Object(LockComponent),
3829}
3830
3831#[derive(Debug, serde::Deserialize)]
3832#[allow(dead_code)]
3833struct LockComponent {
3834 reference: Option<String>,
3835 #[serde(rename = "ref")]
3836 ref_field: Option<String>,
3837 digest: Option<String>,
3838 name: Option<String>,
3839}
3840
3841impl LockEntry {
3842 fn to_resolved(&self) -> LockResolvedEntry {
3843 match self {
3844 LockEntry::String(s) => LockResolvedEntry {
3845 reference: Some(s.clone()),
3846 digest: None,
3847 },
3848 LockEntry::Object(obj) => LockResolvedEntry {
3849 reference: obj
3850 .reference
3851 .clone()
3852 .or_else(|| obj.ref_field.clone())
3853 .or_else(|| obj.digest.clone()),
3854 digest: obj.digest.clone(),
3855 },
3856 }
3857 }
3858}
3859
3860#[derive(Clone, Debug)]
3861struct LockResolvedEntry {
3862 reference: Option<String>,
3863 digest: Option<String>,
3864}
3865
3866fn parse_lockfile(data: &str) -> Result<Vec<LockResolvedEntry>, serde_json::Error> {
3867 if let Ok(entries) = serde_json::from_str::<Vec<LockEntry>>(data) {
3868 return Ok(entries.into_iter().map(|e| e.to_resolved()).collect());
3869 }
3870 let parsed: LockFile = serde_json::from_str(data)?;
3871 let _ = parsed.schema_version;
3872 Ok(parsed
3873 .components
3874 .into_iter()
3875 .map(|c| c.to_resolved())
3876 .collect())
3877}
3878
3879#[cfg(test)]
3880mod tests {
3881 use super::*;
3882
3883 #[test]
3884 fn component_id_prefers_metadata_then_manifest_then_fallback() {
3885 let temp = tempfile::tempdir().unwrap();
3886 let cache_dir = temp.path().join("cache");
3887 fs::create_dir_all(&cache_dir).unwrap();
3888 let wasm = cache_dir.join("component.wasm");
3889 fs::write(&wasm, b"wasm").unwrap();
3890
3891 let fallback = "repo/name";
3892 assert_eq!(resolve_component_id_from_cache(&wasm, fallback), fallback);
3893
3894 fs::write(
3895 cache_dir.join("component.manifest.json"),
3896 r#"{"component_id":"from-manifest"}"#,
3897 )
3898 .unwrap();
3899 assert_eq!(
3900 resolve_component_id_from_cache(&wasm, fallback),
3901 "from-manifest"
3902 );
3903
3904 fs::write(
3905 cache_dir.join("metadata.json"),
3906 r#"{"component_id":"from-metadata"}"#,
3907 )
3908 .unwrap();
3909 assert_eq!(
3910 resolve_component_id_from_cache(&wasm, fallback),
3911 "from-metadata"
3912 );
3913 }
3914
3915 #[test]
3916 fn abi_version_is_best_effort_from_metadata_or_manifest() {
3917 let temp = tempfile::tempdir().unwrap();
3918 let cache_dir = temp.path().join("cache");
3919 fs::create_dir_all(&cache_dir).unwrap();
3920 let wasm = cache_dir.join("component.wasm");
3921 fs::write(&wasm, b"wasm").unwrap();
3922
3923 assert_eq!(resolve_abi_version_from_cache(&wasm), None);
3924
3925 fs::write(
3926 cache_dir.join("component.manifest.json"),
3927 r#"{"abi_version":"0.6.0"}"#,
3928 )
3929 .unwrap();
3930 assert_eq!(
3931 resolve_abi_version_from_cache(&wasm),
3932 Some("0.6.0".to_string())
3933 );
3934
3935 fs::write(cache_dir.join("metadata.json"), r#"{"abi":"not-semver"}"#).unwrap();
3936 assert_eq!(
3937 resolve_abi_version_from_cache(&wasm),
3938 Some("0.6.0".to_string())
3939 );
3940
3941 fs::write(cache_dir.join("metadata.json"), r#"{"abiVersion":"1.2.3"}"#).unwrap();
3942 assert_eq!(
3943 resolve_abi_version_from_cache(&wasm),
3944 Some("1.2.3".to_string())
3945 );
3946 }
3947}