1use std::path::{Path, PathBuf};
5
6use oci_client::manifest::OciImageManifest;
7
8use crate::provenance::{Provenance, Source};
9use crate::reference::Ref;
10use crate::store::{Store, StoreError, Stored};
11
12fn now_rfc3339() -> String {
14 chrono::Utc::now().to_rfc3339()
15}
16
17pub fn install_local(store: &Store, path: &Path) -> Result<Stored, StoreError> {
20 let bytes = std::fs::read(path)?;
21 let provenance = Provenance {
22 source: Source::Local {
23 path: local_ref(path),
24 },
25 digest: format!("sha256:{}", crate::layout::sha256_hex(&bytes)),
26 fetched_at: now_rfc3339(),
27 name: None,
28 version: None,
29 };
30 store.put_component(&bytes, None, &provenance)
31}
32
33pub(crate) fn local_ref(path: &Path) -> String {
35 let abs = std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf());
36 format!("file://{}", abs.display())
37}
38
39pub fn store_http_bytes(
42 store: &Store,
43 url: &str,
44 bytes: &[u8],
45 etag: Option<String>,
46 last_modified: Option<String>,
47) -> Result<Stored, StoreError> {
48 let provenance = Provenance {
49 source: Source::Http {
50 url: url.to_string(),
51 etag,
52 last_modified,
53 },
54 digest: format!("sha256:{}", crate::layout::sha256_hex(bytes)),
55 fetched_at: now_rfc3339(),
56 name: None,
57 version: None,
58 };
59 store.put_component(bytes, None, &provenance)
60}
61
62pub async fn fetch_http(store: &Store, url: &str) -> Result<Stored, StoreError> {
64 let resp = reqwest::get(url)
65 .await
66 .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
67 if !resp.status().is_success() {
68 return Err(StoreError::Io(std::io::Error::other(format!(
69 "HTTP {} fetching {url}",
70 resp.status()
71 ))));
72 }
73 let etag = header(&resp, reqwest::header::ETAG);
74 let last_modified = header(&resp, reqwest::header::LAST_MODIFIED);
75 let bytes = resp
76 .bytes()
77 .await
78 .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
79 store_http_bytes(store, url, &bytes, etag, last_modified)
80}
81
82fn header(resp: &reqwest::Response, name: reqwest::header::HeaderName) -> Option<String> {
83 resp.headers()
84 .get(name)
85 .and_then(|v| v.to_str().ok())
86 .map(str::to_string)
87}
88
89pub fn assemble_oci(
93 store: &Store,
94 reference: &str,
95 manifest_bytes: &[u8],
96 manifest_digest: &str,
97 get_blob: impl Fn(&str) -> Result<Vec<u8>, StoreError>,
98) -> Result<Stored, StoreError> {
99 let manifest: OciImageManifest = serde_json::from_slice(manifest_bytes)
100 .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
101
102 let mut blobs: Vec<(String, Vec<u8>)> = Vec::new();
103 let mut want = |digest: &str| -> Result<(), StoreError> {
104 let hex = strip(digest);
105 blobs.push((hex.clone(), get_blob(&hex)?));
106 Ok(())
107 };
108 want(&manifest.config.digest)?;
109 for layer in &manifest.layers {
110 want(&layer.digest)?;
111 }
112
113 let provenance = Provenance {
114 source: Source::Oci {
115 reference: reference.to_string(),
116 },
117 digest: manifest_digest.to_string(),
118 fetched_at: now_rfc3339(),
119 name: None,
120 version: None,
121 };
122 store.put_oci_artifact(manifest_bytes, &blobs, &provenance)
123}
124
125pub async fn fetch_oci(store: &Store, reference: &str) -> Result<Stored, StoreError> {
127 use oci_client::client::{ClientConfig, ClientProtocol};
128 use oci_client::manifest::{IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_MEDIA_TYPE};
129 use oci_client::secrets::RegistryAuth;
130 use oci_client::{Client, Reference};
131
132 let oci_ref: Reference = reference
133 .strip_prefix("oci://")
134 .unwrap_or(reference)
135 .parse()
136 .map_err(|e| {
137 StoreError::Io(std::io::Error::other(format!(
138 "bad OCI ref {reference}: {e}"
139 )))
140 })?;
141 let client = Client::new(ClientConfig {
142 protocol: ClientProtocol::Https,
143 ..Default::default()
144 });
145 let auth = RegistryAuth::Anonymous;
146
147 let (manifest_raw, manifest_digest) = client
149 .pull_manifest_raw(
150 &oci_ref,
151 &auth,
152 &[OCI_IMAGE_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE],
153 )
154 .await
155 .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
156 let manifest_bytes: Vec<u8> = manifest_raw.to_vec();
157
158 let manifest: OciImageManifest = serde_json::from_slice(&manifest_bytes)
159 .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
160
161 let mut fetched: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
164 let mut descriptors = vec![manifest.config.clone()];
165 descriptors.extend(manifest.layers.iter().cloned());
166 for desc in &descriptors {
167 let mut buf: Vec<u8> = Vec::new();
168 client
169 .pull_blob(&oci_ref, desc, &mut buf)
170 .await
171 .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
172 fetched.insert(strip(&desc.digest), buf);
173 }
174
175 let stored = assemble_oci(store, reference, &manifest_bytes, &manifest_digest, |hex| {
176 fetched
177 .get(hex)
178 .cloned()
179 .ok_or_else(|| StoreError::Digest(hex.into()))
180 })?;
181 collect_referrers(
182 &client,
183 &auth,
184 &oci_ref,
185 &manifest_digest,
186 store,
187 REFERRER_DEPTH,
188 )
189 .await;
190 Ok(stored)
191}
192
193fn strip(digest: &str) -> String {
194 digest.rsplit(':').next().unwrap_or(digest).to_string()
195}
196
197const REFERRER_DEPTH: u8 = 4;
199
200pub fn store_referrer(
202 store: &Store,
203 manifest_bytes: &[u8],
204 blobs: &[(String, Vec<u8>)],
205 subject_digest: &str,
206 artifact_type: Option<&str>,
207) -> Result<String, StoreError> {
208 store.put_referrer(manifest_bytes, blobs, subject_digest, artifact_type)
209}
210
211fn digest_ref(
213 repo: &oci_client::Reference,
214 digest: &str,
215) -> Result<oci_client::Reference, StoreError> {
216 let d = if digest.contains(':') {
217 digest.to_string()
218 } else {
219 format!("sha256:{digest}")
220 };
221 format!("{}/{}@{}", repo.registry(), repo.repository(), d)
222 .parse()
223 .map_err(|e| StoreError::Io(std::io::Error::other(format!("bad digest ref: {e}"))))
224}
225
226async fn referrer_blobs(
228 client: &oci_client::Client,
229 referrer_ref: &oci_client::Reference,
230 manifest_bytes: &[u8],
231) -> Result<Vec<(String, Vec<u8>)>, StoreError> {
232 let manifest: OciImageManifest = serde_json::from_slice(manifest_bytes)
233 .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
234 let mut descriptors = vec![manifest.config.clone()];
235 descriptors.extend(manifest.layers.iter().cloned());
236 let mut out = Vec::new();
237 for d in &descriptors {
238 let mut buf: Vec<u8> = Vec::new();
239 client
240 .pull_blob(referrer_ref, d, &mut buf)
241 .await
242 .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
243 out.push((strip(&d.digest), buf));
244 }
245 Ok(out)
246}
247
248async fn collect_referrers(
254 client: &oci_client::Client,
255 auth: &oci_client::secrets::RegistryAuth,
256 repo: &oci_client::Reference,
257 subject_digest: &str,
258 store: &Store,
259 depth: u8,
260) {
261 use oci_client::manifest::{IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_MEDIA_TYPE};
262 if depth == 0 {
263 return;
264 }
265 let subject_ref = match digest_ref(repo, subject_digest) {
266 Ok(r) => r,
267 Err(_) => return,
268 };
269 let index = match client.pull_referrers(&subject_ref, None).await {
270 Ok(idx) => idx,
271 Err(e) => {
272 tracing::debug!(%subject_digest, error = %e, "no referrers / referrers API unavailable");
273 return;
274 }
275 };
276 for desc in index.manifests {
277 let ref_digest = desc.digest.clone();
278 let referrer_ref = match digest_ref(repo, &ref_digest) {
279 Ok(r) => r,
280 Err(_) => continue,
281 };
282 let pulled = client
283 .pull_manifest_raw(
284 &referrer_ref,
285 auth,
286 &[OCI_IMAGE_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE],
287 )
288 .await;
289 let (m_bytes, m_digest) = match pulled {
290 Ok((b, d)) => (b.to_vec(), d),
291 Err(e) => {
292 tracing::warn!(%ref_digest, error = %e, "failed to pull referrer manifest");
293 continue;
294 }
295 };
296 let blobs = match referrer_blobs(client, &referrer_ref, &m_bytes).await {
297 Ok(b) => b,
298 Err(e) => {
299 tracing::warn!(%ref_digest, error = %e, "failed to pull referrer blobs");
300 continue;
301 }
302 };
303 let artifact_type = desc.artifact_type.clone();
304 if let Err(e) =
305 store.put_referrer(&m_bytes, &blobs, subject_digest, artifact_type.as_deref())
306 {
307 tracing::warn!(%ref_digest, error = %e, "failed to store referrer");
308 continue;
309 }
310 Box::pin(collect_referrers(
311 client,
312 auth,
313 repo,
314 &m_digest,
315 store,
316 depth - 1,
317 ))
318 .await;
319 }
320}
321
322pub async fn pull(store: &Store, reference: &str) -> Result<Stored, StoreError> {
325 let parsed: Ref = reference
326 .parse()
327 .map_err(|e| StoreError::Io(std::io::Error::other(format!("{e}"))))?;
328 match parsed {
329 Ref::Local(path) => install_local(store, &path),
330 Ref::Http(url) => fetch_http(store, url.as_str()).await,
331 Ref::Oci(r) => fetch_oci(store, &format!("oci://{r}")).await,
332 Ref::Name(n) => Err(StoreError::Io(std::io::Error::other(format!(
333 "registry name resolution not implemented: {n}"
334 )))),
335 }
336}
337
338pub(crate) fn lookup_ref(reference: &str) -> String {
342 match reference.parse::<Ref>() {
343 Ok(Ref::Local(path)) => local_ref(&path),
344 Ok(Ref::Oci(r)) => format!("oci://{r}"),
345 Ok(Ref::Http(url)) => url.to_string(),
346 _ => reference.to_string(),
347 }
348}
349
350pub async fn ensure(store: &Store, reference: &str) -> Result<PathBuf, StoreError> {
353 let key = lookup_ref(reference);
354 if let Some(path) = store.resolve(&key)? {
355 return Ok(path);
356 }
357 pull(store, reference).await?;
358 store.resolve(&key)?.ok_or_else(|| {
359 StoreError::Io(std::io::Error::other(format!(
360 "resolve failed after pull: {reference}"
361 )))
362 })
363}
364
365#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum UpdateOutcome {
368 Unchanged,
370 Updated { from: String, to: String },
372 NotStored,
374}
375
376pub async fn update(store: &Store, reference: &str) -> Result<UpdateOutcome, StoreError> {
378 let key = lookup_ref(reference);
379 let before = store
380 .list()?
381 .into_iter()
382 .find(|s| source_ref(&s.provenance) == key)
383 .map(|s| s.provenance.digest);
384 let Some(before) = before else {
385 return Ok(UpdateOutcome::NotStored);
386 };
387 let restored = pull(store, reference).await?;
388 let after = restored.provenance.digest;
389 if after == before {
390 Ok(UpdateOutcome::Unchanged)
391 } else {
392 Ok(UpdateOutcome::Updated {
393 from: before,
394 to: after,
395 })
396 }
397}
398
399fn source_ref(p: &Provenance) -> &str {
401 match &p.source {
402 Source::Oci { reference } => reference,
403 Source::Http { url, .. } => url,
404 Source::Local { path } => path,
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411 use tempfile::TempDir;
412
413 #[test]
414 fn install_local_then_resolve() {
415 let dir = TempDir::new().unwrap();
416 let store = Store::open(dir.path()).unwrap();
417 let wasm_path = dir.path().join("c.wasm");
418 std::fs::write(&wasm_path, b"local-bytes").unwrap();
419 let stored = install_local(&store, &wasm_path).unwrap();
420 assert!(matches!(stored.provenance.source, Source::Local { .. }));
421 let file_ref = match &stored.provenance.source {
422 Source::Local { path } => path.clone(),
423 _ => unreachable!(),
424 };
425 let resolved = store.resolve(&file_ref).unwrap().expect("hit");
426 assert_eq!(std::fs::read(resolved).unwrap(), b"local-bytes");
427 }
428
429 #[test]
430 fn store_http_bytes_records_http_provenance_with_headers() {
431 let dir = TempDir::new().unwrap();
432 let store = Store::open(dir.path()).unwrap();
433 let stored = store_http_bytes(
434 &store,
435 "https://cdn.example.com/x.wasm",
436 b"http-bytes",
437 Some("\"etag123\"".into()),
438 Some("Wed, 21 May 2026 00:00:00 GMT".into()),
439 )
440 .unwrap();
441 match stored.provenance.source {
442 Source::Http {
443 url,
444 etag,
445 last_modified,
446 } => {
447 assert_eq!(url, "https://cdn.example.com/x.wasm");
448 assert_eq!(etag.as_deref(), Some("\"etag123\""));
449 assert!(last_modified.is_some());
450 }
451 _ => panic!("expected Http source"),
452 }
453 assert!(
454 store
455 .resolve("https://cdn.example.com/x.wasm")
456 .unwrap()
457 .is_some()
458 );
459 }
460
461 #[test]
462 fn assemble_oci_stores_verbatim_and_resolves() {
463 let dir = TempDir::new().unwrap();
464 let store = Store::open(dir.path()).unwrap();
465 let wasm = b"\0asm\x01\0\0\0oci";
466 let wasm_hex = crate::layout::sha256_hex(wasm);
467 let cfg = b"\xA0";
468 let cfg_hex = crate::layout::sha256_hex(cfg);
469 let manifest = format!(
470 r#"{{"schemaVersion":2,"mediaType":"application/vnd.oci.image.manifest.v1+json","config":{{"mediaType":"application/vnd.actcore.component.config.v1+cbor","digest":"sha256:{cfg_hex}","size":{c}}},"layers":[{{"mediaType":"application/wasm","digest":"sha256:{wasm_hex}","size":{w}}}]}}"#,
471 c = cfg.len(), w = wasm.len(),
472 ).into_bytes();
473 let upstream = crate::layout::sha256_hex(&manifest);
474 let mut blobs = std::collections::HashMap::new();
475 blobs.insert(wasm_hex.clone(), wasm.to_vec());
476 blobs.insert(cfg_hex.clone(), cfg.to_vec());
477 let stored = assemble_oci(
478 &store,
479 "oci://ghcr.io/x/oci:1",
480 &manifest,
481 &format!("sha256:{upstream}"),
482 |hex| {
483 blobs
484 .get(hex)
485 .cloned()
486 .ok_or_else(|| StoreError::Digest(hex.into()))
487 },
488 )
489 .unwrap();
490 assert_eq!(stored.manifest_digest, upstream);
491 assert_eq!(stored.provenance.digest, format!("sha256:{upstream}"));
492 assert_eq!(
493 std::fs::read(store.resolve("oci://ghcr.io/x/oci:1").unwrap().unwrap()).unwrap(),
494 wasm
495 );
496 }
497
498 #[tokio::test]
499 #[ignore = "network: fetches a real .wasm over HTTP"]
500 async fn fetch_http_live() {
501 let url = "https://github.com/actcore/act-cli/raw/main/README.md";
502 let dir = TempDir::new().unwrap();
503 let store = Store::open(dir.path()).unwrap();
504 let stored = fetch_http(&store, url).await.unwrap();
505 assert!(stored.provenance.digest.starts_with("sha256:"));
506 assert!(store.resolve(url).unwrap().is_some());
507 }
508
509 #[test]
510 fn store_referrer_offline() {
511 let dir = TempDir::new().unwrap();
512 let store = Store::open(dir.path()).unwrap();
513 let subject = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08";
514 let m = br#"{"schemaVersion":2,"mediaType":"application/vnd.oci.image.manifest.v1+json","config":{"mediaType":"application/vnd.oci.empty.v1+json","digest":"sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a","size":2},"layers":[]}"#.to_vec();
515 let cfg = b"{}".to_vec();
516 let cfg_hex = crate::layout::sha256_hex(&cfg);
517 super::store_referrer(
518 &store,
519 &m,
520 &[(cfg_hex, cfg)],
521 subject,
522 Some("application/spdx+json"),
523 )
524 .unwrap();
525 assert_eq!(
526 store
527 .list_referrers_by_digest(
528 "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
529 )
530 .unwrap()
531 .len(),
532 1
533 );
534 }
535
536 #[tokio::test]
537 #[ignore = "network: pulls a component AND its referrers from ghcr.io"]
538 async fn fetch_oci_with_referrers_live() {
539 let dir = TempDir::new().unwrap();
540 let store = Store::open(dir.path()).unwrap();
541 let r = "oci://ghcr.io/actpkg/time:0.2.0";
542 let stored = super::fetch_oci(&store, r).await.unwrap();
543 assert!(store.resolve(r).unwrap().is_some());
544 let refs = store
545 .list_referrers_by_digest(&stored.manifest_digest)
546 .unwrap();
547 eprintln!("referrers collected for time:0.2.0: {}", refs.len());
548 }
549
550 #[tokio::test]
551 #[ignore = "network: pulls a real component from ghcr.io"]
552 async fn fetch_oci_live() {
553 let dir = TempDir::new().unwrap();
554 let store = Store::open(dir.path()).unwrap();
555 let r = "oci://ghcr.io/actpkg/time:0.2.0";
556 let stored = fetch_oci(&store, r).await.unwrap();
557 assert!(stored.provenance.digest.starts_with("sha256:"));
558 assert!(store.resolve(r).unwrap().is_some());
559 }
560
561 #[tokio::test]
562 async fn pull_dispatches_local_by_ref_kind() {
563 let dir = TempDir::new().unwrap();
564 let store = Store::open(dir.path()).unwrap();
565 let p = dir.path().join("d.wasm");
566 std::fs::write(&p, b"dispatch").unwrap();
567 let stored = super::pull(&store, &p.display().to_string()).await.unwrap();
568 assert!(matches!(stored.provenance.source, Source::Local { .. }));
569 }
570
571 #[tokio::test]
572 async fn ensure_local_by_bare_path_is_read_through() {
573 let dir = TempDir::new().unwrap();
574 let store = Store::open(dir.path()).unwrap();
575 let p = dir.path().join("f.wasm");
576 std::fs::write(&p, b"bare").unwrap();
577 let bare = p.display().to_string();
578 let a = super::ensure(&store, &bare).await.unwrap(); let b = super::ensure(&store, &bare).await.unwrap(); assert_eq!(a, b);
581 assert_eq!(std::fs::read(&a).unwrap(), b"bare");
582 }
583
584 #[tokio::test]
585 async fn update_local_noop_then_changed() {
586 let dir = TempDir::new().unwrap();
587 let store = Store::open(dir.path()).unwrap();
588 let p = dir.path().join("u.wasm");
589 std::fs::write(&p, b"v1").unwrap();
590 let stored = super::pull(&store, &p.display().to_string()).await.unwrap();
591 let r = match &stored.provenance.source {
592 Source::Local { path } => path.clone(),
593 _ => unreachable!(),
594 };
595 assert!(matches!(
596 super::update(&store, &r).await.unwrap(),
597 super::UpdateOutcome::Unchanged
598 ));
599 std::fs::write(&p, b"v2-bigger").unwrap();
600 match super::update(&store, &r).await.unwrap() {
601 super::UpdateOutcome::Updated { from, to } => assert_ne!(from, to),
602 other => panic!("expected Updated, got {other:?}"),
603 }
604 }
605}