Skip to main content

act_store/
fetch.rs

1//! Fetching components into the store: OCI / HTTP / local. Network I/O is
2//! isolated in thin wrappers; store-assembly logic is offline-testable.
3
4use std::path::{Path, PathBuf};
5
6use oci_client::manifest::OciImageManifest;
7
8use crate::provenance::{Provenance, Source};
9use crate::reference::Ref;
10use crate::store::{Store, StoreError, Stored};
11
12/// RFC 3339 timestamp for "now".
13fn now_rfc3339() -> String {
14    chrono::Utc::now().to_rfc3339()
15}
16
17/// Install a component from a local file path as a pinned `local` snapshot
18/// (synthesized manifest). Records the source ref as `file://<absolute path>`.
19pub fn install_local(store: &Store, path: &Path) -> Result<Stored, StoreError> {
20    let bytes = std::fs::read(path)?;
21    let provenance = Provenance {
22        source: Source::Local {
23            path: local_ref(path),
24        },
25        digest: format!("sha256:{}", crate::layout::sha256_hex(&bytes)),
26        fetched_at: now_rfc3339(),
27        name: None,
28        version: None,
29    };
30    store.put_component(&bytes, None, &provenance)
31}
32
33/// The canonical `file://<absolute path>` ref string for a local file.
34pub(crate) fn local_ref(path: &Path) -> String {
35    let abs = std::fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf());
36    format!("file://{}", abs.display())
37}
38
39/// Assemble HTTP provenance from already-downloaded bytes + optional caching
40/// headers and store via a synthesized manifest. Offline — no network.
41pub fn store_http_bytes(
42    store: &Store,
43    url: &str,
44    bytes: &[u8],
45    etag: Option<String>,
46    last_modified: Option<String>,
47) -> Result<Stored, StoreError> {
48    let provenance = Provenance {
49        source: Source::Http {
50            url: url.to_string(),
51            etag,
52            last_modified,
53        },
54        digest: format!("sha256:{}", crate::layout::sha256_hex(bytes)),
55        fetched_at: now_rfc3339(),
56        name: None,
57        version: None,
58    };
59    store.put_component(bytes, None, &provenance)
60}
61
62/// Download a `.wasm` from `url` and store it. Network wrapper.
63pub async fn fetch_http(store: &Store, url: &str) -> Result<Stored, StoreError> {
64    let resp = reqwest::get(url)
65        .await
66        .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
67    if !resp.status().is_success() {
68        return Err(StoreError::Io(std::io::Error::other(format!(
69            "HTTP {} fetching {url}",
70            resp.status()
71        ))));
72    }
73    let etag = header(&resp, reqwest::header::ETAG);
74    let last_modified = header(&resp, reqwest::header::LAST_MODIFIED);
75    let bytes = resp
76        .bytes()
77        .await
78        .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
79    store_http_bytes(store, url, &bytes, etag, last_modified)
80}
81
82fn header(resp: &reqwest::Response, name: reqwest::header::HeaderName) -> Option<String> {
83    resp.headers()
84        .get(name)
85        .and_then(|v| v.to_str().ok())
86        .map(str::to_string)
87}
88
89/// Offline OCI assembly: parse `manifest_bytes`, collect config + every layer
90/// blob via `get_blob` (keyed by hex digest), store verbatim. `manifest_digest`
91/// is the upstream digest (`sha256:...`).
92pub fn assemble_oci(
93    store: &Store,
94    reference: &str,
95    manifest_bytes: &[u8],
96    manifest_digest: &str,
97    get_blob: impl Fn(&str) -> Result<Vec<u8>, StoreError>,
98) -> Result<Stored, StoreError> {
99    let manifest: OciImageManifest = serde_json::from_slice(manifest_bytes)
100        .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
101
102    let mut blobs: Vec<(String, Vec<u8>)> = Vec::new();
103    let mut want = |digest: &str| -> Result<(), StoreError> {
104        let hex = strip(digest);
105        blobs.push((hex.clone(), get_blob(&hex)?));
106        Ok(())
107    };
108    want(&manifest.config.digest)?;
109    for layer in &manifest.layers {
110        want(&layer.digest)?;
111    }
112
113    let provenance = Provenance {
114        source: Source::Oci {
115            reference: reference.to_string(),
116        },
117        digest: manifest_digest.to_string(),
118        fetched_at: now_rfc3339(),
119        name: None,
120        version: None,
121    };
122    store.put_oci_artifact(manifest_bytes, &blobs, &provenance)
123}
124
125/// Pull an OCI component (manifest + blobs) and store it verbatim.
126pub async fn fetch_oci(store: &Store, reference: &str) -> Result<Stored, StoreError> {
127    use oci_client::client::{ClientConfig, ClientProtocol};
128    use oci_client::manifest::{IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_MEDIA_TYPE};
129    use oci_client::secrets::RegistryAuth;
130    use oci_client::{Client, Reference};
131
132    let oci_ref: Reference = reference
133        .strip_prefix("oci://")
134        .unwrap_or(reference)
135        .parse()
136        .map_err(|e| {
137            StoreError::Io(std::io::Error::other(format!(
138                "bad OCI ref {reference}: {e}"
139            )))
140        })?;
141    let client = Client::new(ClientConfig {
142        protocol: ClientProtocol::Https,
143        ..Default::default()
144    });
145    let auth = RegistryAuth::Anonymous;
146
147    // pull_manifest_raw returns (bytes::Bytes, String) in oci-client 0.17
148    let (manifest_raw, manifest_digest) = client
149        .pull_manifest_raw(
150            &oci_ref,
151            &auth,
152            &[OCI_IMAGE_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE],
153        )
154        .await
155        .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
156    let manifest_bytes: Vec<u8> = manifest_raw.to_vec();
157
158    let manifest: OciImageManifest = serde_json::from_slice(&manifest_bytes)
159        .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
160
161    // Pre-fetch all blobs (config + layers) into a map keyed by hex digest.
162    // pull_blob in oci-client 0.17 writes to T: AsyncWrite; use Vec<u8> as sink.
163    let mut fetched: std::collections::HashMap<String, Vec<u8>> = std::collections::HashMap::new();
164    let mut descriptors = vec![manifest.config.clone()];
165    descriptors.extend(manifest.layers.iter().cloned());
166    for desc in &descriptors {
167        let mut buf: Vec<u8> = Vec::new();
168        client
169            .pull_blob(&oci_ref, desc, &mut buf)
170            .await
171            .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
172        fetched.insert(strip(&desc.digest), buf);
173    }
174
175    let stored = assemble_oci(store, reference, &manifest_bytes, &manifest_digest, |hex| {
176        fetched
177            .get(hex)
178            .cloned()
179            .ok_or_else(|| StoreError::Digest(hex.into()))
180    })?;
181    collect_referrers(
182        &client,
183        &auth,
184        &oci_ref,
185        &manifest_digest,
186        store,
187        REFERRER_DEPTH,
188    )
189    .await;
190    Ok(stored)
191}
192
193fn strip(digest: &str) -> String {
194    digest.rsplit(':').next().unwrap_or(digest).to_string()
195}
196
197/// Max depth for transitive referrer collection (referrer-of-a-referrer).
198const REFERRER_DEPTH: u8 = 4;
199
200/// Offline: store one referrer's manifest + blobs against `subject_digest`.
201pub fn store_referrer(
202    store: &Store,
203    manifest_bytes: &[u8],
204    blobs: &[(String, Vec<u8>)],
205    subject_digest: &str,
206    artifact_type: Option<&str>,
207) -> Result<String, StoreError> {
208    store.put_referrer(manifest_bytes, blobs, subject_digest, artifact_type)
209}
210
211/// Build a by-digest `Reference` in the same repo as `repo`.
212fn digest_ref(
213    repo: &oci_client::Reference,
214    digest: &str,
215) -> Result<oci_client::Reference, StoreError> {
216    let d = if digest.contains(':') {
217        digest.to_string()
218    } else {
219        format!("sha256:{digest}")
220    };
221    format!("{}/{}@{}", repo.registry(), repo.repository(), d)
222        .parse()
223        .map_err(|e| StoreError::Io(std::io::Error::other(format!("bad digest ref: {e}"))))
224}
225
226/// Pull a referrer manifest's config + layer blobs into `(hex, bytes)` pairs.
227async fn referrer_blobs(
228    client: &oci_client::Client,
229    referrer_ref: &oci_client::Reference,
230    manifest_bytes: &[u8],
231) -> Result<Vec<(String, Vec<u8>)>, StoreError> {
232    let manifest: OciImageManifest = serde_json::from_slice(manifest_bytes)
233        .map_err(|e| StoreError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?;
234    let mut descriptors = vec![manifest.config.clone()];
235    descriptors.extend(manifest.layers.iter().cloned());
236    let mut out = Vec::new();
237    for d in &descriptors {
238        let mut buf: Vec<u8> = Vec::new();
239        client
240            .pull_blob(referrer_ref, d, &mut buf)
241            .await
242            .map_err(|e| StoreError::Io(std::io::Error::other(e)))?;
243        out.push((strip(&d.digest), buf));
244    }
245    Ok(out)
246}
247
248/// Pull every connected artifact (referrer) of component manifest
249/// `subject_digest` (`sha256:...`) in `repo`, store it, and recurse to the
250/// transitive closure (depth-capped). Best-effort: a registry without the
251/// referrers API yields nothing; per-referrer errors are logged and skipped so
252/// referrer collection never fails the component pull.
253async fn collect_referrers(
254    client: &oci_client::Client,
255    auth: &oci_client::secrets::RegistryAuth,
256    repo: &oci_client::Reference,
257    subject_digest: &str,
258    store: &Store,
259    depth: u8,
260) {
261    use oci_client::manifest::{IMAGE_MANIFEST_MEDIA_TYPE, OCI_IMAGE_MEDIA_TYPE};
262    if depth == 0 {
263        return;
264    }
265    let subject_ref = match digest_ref(repo, subject_digest) {
266        Ok(r) => r,
267        Err(_) => return,
268    };
269    let index = match client.pull_referrers(&subject_ref, None).await {
270        Ok(idx) => idx,
271        Err(e) => {
272            tracing::debug!(%subject_digest, error = %e, "no referrers / referrers API unavailable");
273            return;
274        }
275    };
276    for desc in index.manifests {
277        let ref_digest = desc.digest.clone();
278        let referrer_ref = match digest_ref(repo, &ref_digest) {
279            Ok(r) => r,
280            Err(_) => continue,
281        };
282        let pulled = client
283            .pull_manifest_raw(
284                &referrer_ref,
285                auth,
286                &[OCI_IMAGE_MEDIA_TYPE, IMAGE_MANIFEST_MEDIA_TYPE],
287            )
288            .await;
289        let (m_bytes, m_digest) = match pulled {
290            Ok((b, d)) => (b.to_vec(), d),
291            Err(e) => {
292                tracing::warn!(%ref_digest, error = %e, "failed to pull referrer manifest");
293                continue;
294            }
295        };
296        let blobs = match referrer_blobs(client, &referrer_ref, &m_bytes).await {
297            Ok(b) => b,
298            Err(e) => {
299                tracing::warn!(%ref_digest, error = %e, "failed to pull referrer blobs");
300                continue;
301            }
302        };
303        let artifact_type = desc.artifact_type.clone();
304        if let Err(e) =
305            store.put_referrer(&m_bytes, &blobs, subject_digest, artifact_type.as_deref())
306        {
307            tracing::warn!(%ref_digest, error = %e, "failed to store referrer");
308            continue;
309        }
310        Box::pin(collect_referrers(
311            client,
312            auth,
313            repo,
314            &m_digest,
315            store,
316            depth - 1,
317        ))
318        .await;
319    }
320}
321
322/// Fetch `reference` into the store regardless of kind. Local files are
323/// installed as pinned snapshots.
324pub async fn pull(store: &Store, reference: &str) -> Result<Stored, StoreError> {
325    let parsed: Ref = reference
326        .parse()
327        .map_err(|e| StoreError::Io(std::io::Error::other(format!("{e}"))))?;
328    match parsed {
329        Ref::Local(path) => install_local(store, &path),
330        Ref::Http(url) => fetch_http(store, url.as_str()).await,
331        Ref::Oci(r) => fetch_oci(store, &format!("oci://{r}")).await,
332        Ref::Name(n) => Err(StoreError::Io(std::io::Error::other(format!(
333            "registry name resolution not implemented: {n}"
334        )))),
335    }
336}
337
338/// The canonical store-lookup ref for a user-supplied reference. Must match the
339/// ref that `pull` records when storing: local -> `file://<canonical>`,
340/// oci -> `oci://<ref>`, http -> the URL string.
341pub(crate) fn lookup_ref(reference: &str) -> String {
342    match reference.parse::<Ref>() {
343        Ok(Ref::Local(path)) => local_ref(&path),
344        Ok(Ref::Oci(r)) => format!("oci://{r}"),
345        Ok(Ref::Http(url)) => url.to_string(),
346        _ => reference.to_string(),
347    }
348}
349
350/// Read-through resolve: return the wasm blob path for `reference`, pulling it
351/// into the store first if absent.
352pub async fn ensure(store: &Store, reference: &str) -> Result<PathBuf, StoreError> {
353    let key = lookup_ref(reference);
354    if let Some(path) = store.resolve(&key)? {
355        return Ok(path);
356    }
357    pull(store, reference).await?;
358    store.resolve(&key)?.ok_or_else(|| {
359        StoreError::Io(std::io::Error::other(format!(
360            "resolve failed after pull: {reference}"
361        )))
362    })
363}
364
365/// Result of an [`update`].
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum UpdateOutcome {
368    /// The re-resolved digest matched the stored one; nothing changed.
369    Unchanged,
370    /// A newer artifact was pulled. Digests are `sha256:...`.
371    Updated { from: String, to: String },
372    /// The ref is not in the store.
373    NotStored,
374}
375
376/// Re-resolve `reference` and re-pull if the digest moved.
377pub async fn update(store: &Store, reference: &str) -> Result<UpdateOutcome, StoreError> {
378    let key = lookup_ref(reference);
379    let before = store
380        .list()?
381        .into_iter()
382        .find(|s| source_ref(&s.provenance) == key)
383        .map(|s| s.provenance.digest);
384    let Some(before) = before else {
385        return Ok(UpdateOutcome::NotStored);
386    };
387    let restored = pull(store, reference).await?;
388    let after = restored.provenance.digest;
389    if after == before {
390        Ok(UpdateOutcome::Unchanged)
391    } else {
392        Ok(UpdateOutcome::Updated {
393            from: before,
394            to: after,
395        })
396    }
397}
398
399/// The `source.ref` (as stored) of a provenance, for matching against a key.
400fn source_ref(p: &Provenance) -> &str {
401    match &p.source {
402        Source::Oci { reference } => reference,
403        Source::Http { url, .. } => url,
404        Source::Local { path } => path,
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use tempfile::TempDir;
412
413    #[test]
414    fn install_local_then_resolve() {
415        let dir = TempDir::new().unwrap();
416        let store = Store::open(dir.path()).unwrap();
417        let wasm_path = dir.path().join("c.wasm");
418        std::fs::write(&wasm_path, b"local-bytes").unwrap();
419        let stored = install_local(&store, &wasm_path).unwrap();
420        assert!(matches!(stored.provenance.source, Source::Local { .. }));
421        let file_ref = match &stored.provenance.source {
422            Source::Local { path } => path.clone(),
423            _ => unreachable!(),
424        };
425        let resolved = store.resolve(&file_ref).unwrap().expect("hit");
426        assert_eq!(std::fs::read(resolved).unwrap(), b"local-bytes");
427    }
428
429    #[test]
430    fn store_http_bytes_records_http_provenance_with_headers() {
431        let dir = TempDir::new().unwrap();
432        let store = Store::open(dir.path()).unwrap();
433        let stored = store_http_bytes(
434            &store,
435            "https://cdn.example.com/x.wasm",
436            b"http-bytes",
437            Some("\"etag123\"".into()),
438            Some("Wed, 21 May 2026 00:00:00 GMT".into()),
439        )
440        .unwrap();
441        match stored.provenance.source {
442            Source::Http {
443                url,
444                etag,
445                last_modified,
446            } => {
447                assert_eq!(url, "https://cdn.example.com/x.wasm");
448                assert_eq!(etag.as_deref(), Some("\"etag123\""));
449                assert!(last_modified.is_some());
450            }
451            _ => panic!("expected Http source"),
452        }
453        assert!(
454            store
455                .resolve("https://cdn.example.com/x.wasm")
456                .unwrap()
457                .is_some()
458        );
459    }
460
461    #[test]
462    fn assemble_oci_stores_verbatim_and_resolves() {
463        let dir = TempDir::new().unwrap();
464        let store = Store::open(dir.path()).unwrap();
465        let wasm = b"\0asm\x01\0\0\0oci";
466        let wasm_hex = crate::layout::sha256_hex(wasm);
467        let cfg = b"\xA0";
468        let cfg_hex = crate::layout::sha256_hex(cfg);
469        let manifest = format!(
470            r#"{{"schemaVersion":2,"mediaType":"application/vnd.oci.image.manifest.v1+json","config":{{"mediaType":"application/vnd.actcore.component.config.v1+cbor","digest":"sha256:{cfg_hex}","size":{c}}},"layers":[{{"mediaType":"application/wasm","digest":"sha256:{wasm_hex}","size":{w}}}]}}"#,
471            c = cfg.len(), w = wasm.len(),
472        ).into_bytes();
473        let upstream = crate::layout::sha256_hex(&manifest);
474        let mut blobs = std::collections::HashMap::new();
475        blobs.insert(wasm_hex.clone(), wasm.to_vec());
476        blobs.insert(cfg_hex.clone(), cfg.to_vec());
477        let stored = assemble_oci(
478            &store,
479            "oci://ghcr.io/x/oci:1",
480            &manifest,
481            &format!("sha256:{upstream}"),
482            |hex| {
483                blobs
484                    .get(hex)
485                    .cloned()
486                    .ok_or_else(|| StoreError::Digest(hex.into()))
487            },
488        )
489        .unwrap();
490        assert_eq!(stored.manifest_digest, upstream);
491        assert_eq!(stored.provenance.digest, format!("sha256:{upstream}"));
492        assert_eq!(
493            std::fs::read(store.resolve("oci://ghcr.io/x/oci:1").unwrap().unwrap()).unwrap(),
494            wasm
495        );
496    }
497
498    #[tokio::test]
499    #[ignore = "network: fetches a real .wasm over HTTP"]
500    async fn fetch_http_live() {
501        let url = "https://github.com/actcore/act-cli/raw/main/README.md";
502        let dir = TempDir::new().unwrap();
503        let store = Store::open(dir.path()).unwrap();
504        let stored = fetch_http(&store, url).await.unwrap();
505        assert!(stored.provenance.digest.starts_with("sha256:"));
506        assert!(store.resolve(url).unwrap().is_some());
507    }
508
509    #[test]
510    fn store_referrer_offline() {
511        let dir = TempDir::new().unwrap();
512        let store = Store::open(dir.path()).unwrap();
513        let subject = "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08";
514        let m = br#"{"schemaVersion":2,"mediaType":"application/vnd.oci.image.manifest.v1+json","config":{"mediaType":"application/vnd.oci.empty.v1+json","digest":"sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a","size":2},"layers":[]}"#.to_vec();
515        let cfg = b"{}".to_vec();
516        let cfg_hex = crate::layout::sha256_hex(&cfg);
517        super::store_referrer(
518            &store,
519            &m,
520            &[(cfg_hex, cfg)],
521            subject,
522            Some("application/spdx+json"),
523        )
524        .unwrap();
525        assert_eq!(
526            store
527                .list_referrers_by_digest(
528                    "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
529                )
530                .unwrap()
531                .len(),
532            1
533        );
534    }
535
536    #[tokio::test]
537    #[ignore = "network: pulls a component AND its referrers from ghcr.io"]
538    async fn fetch_oci_with_referrers_live() {
539        let dir = TempDir::new().unwrap();
540        let store = Store::open(dir.path()).unwrap();
541        let r = "oci://ghcr.io/actpkg/time:0.2.0";
542        let stored = super::fetch_oci(&store, r).await.unwrap();
543        assert!(store.resolve(r).unwrap().is_some());
544        let refs = store
545            .list_referrers_by_digest(&stored.manifest_digest)
546            .unwrap();
547        eprintln!("referrers collected for time:0.2.0: {}", refs.len());
548    }
549
550    #[tokio::test]
551    #[ignore = "network: pulls a real component from ghcr.io"]
552    async fn fetch_oci_live() {
553        let dir = TempDir::new().unwrap();
554        let store = Store::open(dir.path()).unwrap();
555        let r = "oci://ghcr.io/actpkg/time:0.2.0";
556        let stored = fetch_oci(&store, r).await.unwrap();
557        assert!(stored.provenance.digest.starts_with("sha256:"));
558        assert!(store.resolve(r).unwrap().is_some());
559    }
560
561    #[tokio::test]
562    async fn pull_dispatches_local_by_ref_kind() {
563        let dir = TempDir::new().unwrap();
564        let store = Store::open(dir.path()).unwrap();
565        let p = dir.path().join("d.wasm");
566        std::fs::write(&p, b"dispatch").unwrap();
567        let stored = super::pull(&store, &p.display().to_string()).await.unwrap();
568        assert!(matches!(stored.provenance.source, Source::Local { .. }));
569    }
570
571    #[tokio::test]
572    async fn ensure_local_by_bare_path_is_read_through() {
573        let dir = TempDir::new().unwrap();
574        let store = Store::open(dir.path()).unwrap();
575        let p = dir.path().join("f.wasm");
576        std::fs::write(&p, b"bare").unwrap();
577        let bare = p.display().to_string();
578        let a = super::ensure(&store, &bare).await.unwrap(); // pulls
579        let b = super::ensure(&store, &bare).await.unwrap(); // store hit
580        assert_eq!(a, b);
581        assert_eq!(std::fs::read(&a).unwrap(), b"bare");
582    }
583
584    #[tokio::test]
585    async fn update_local_noop_then_changed() {
586        let dir = TempDir::new().unwrap();
587        let store = Store::open(dir.path()).unwrap();
588        let p = dir.path().join("u.wasm");
589        std::fs::write(&p, b"v1").unwrap();
590        let stored = super::pull(&store, &p.display().to_string()).await.unwrap();
591        let r = match &stored.provenance.source {
592            Source::Local { path } => path.clone(),
593            _ => unreachable!(),
594        };
595        assert!(matches!(
596            super::update(&store, &r).await.unwrap(),
597            super::UpdateOutcome::Unchanged
598        ));
599        std::fs::write(&p, b"v2-bigger").unwrap();
600        match super::update(&store, &r).await.unwrap() {
601            super::UpdateOutcome::Updated { from, to } => assert_ne!(from, to),
602            other => panic!("expected Updated, got {other:?}"),
603        }
604    }
605}