wasm_pkg_loader/source/
local.rs

1use std::path::PathBuf;
2
3use anyhow::anyhow;
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures_util::{stream::BoxStream, StreamExt, TryStreamExt};
7use serde::Deserialize;
8use tokio_util::io::ReaderStream;
9use wasm_pkg_common::{config::RegistryConfig, package::Version};
10
11use crate::{
12    source::{PackageSource, VersionInfo},
13    ContentDigest, Error, PackageRef, Release,
14};
15
16#[derive(Clone, Debug, Deserialize)]
17pub struct LocalConfig {
18    pub root: PathBuf,
19}
20
21/// A simple local filesystem-based PackageSource.
22///
23/// Each package release is a file: `<root>/<namespace>/<name>/<version>.wasm`
24pub struct LocalSource {
25    root: PathBuf,
26}
27
28impl LocalSource {
29    pub fn new(registry_config: RegistryConfig) -> Result<Self, Error> {
30        let config = registry_config
31            .backend_config::<LocalConfig>("local")?
32            .ok_or_else(|| {
33                Error::InvalidConfig(anyhow!("'local' backend require configuration"))
34            })?;
35        Ok(Self { root: config.root })
36    }
37
38    fn package_dir(&self, package: &PackageRef) -> PathBuf {
39        self.root
40            .join(package.namespace().as_ref())
41            .join(package.name().as_ref())
42    }
43
44    fn version_path(&self, package: &PackageRef, version: &Version) -> PathBuf {
45        self.package_dir(package).join(format!("{version}.wasm"))
46    }
47}
48
49#[async_trait]
50impl PackageSource for LocalSource {
51    async fn list_all_versions(&mut self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
52        let mut versions = vec![];
53        let package_dir = self.package_dir(package);
54        tracing::debug!(?package_dir, "Reading versions from path");
55        let mut entries = tokio::fs::read_dir(package_dir).await?;
56        while let Some(entry) = entries.next_entry().await? {
57            let path = entry.path();
58            if path.extension() != Some("wasm".as_ref()) {
59                continue;
60            }
61            let Some(version) = path
62                .file_stem()
63                .unwrap()
64                .to_str()
65                .and_then(|stem| Version::parse(stem).ok())
66            else {
67                tracing::warn!("invalid package file name at {path:?}");
68                continue;
69            };
70            versions.push(VersionInfo {
71                version,
72                yanked: false,
73            });
74        }
75        Ok(versions)
76    }
77
78    async fn get_release(
79        &mut self,
80        package: &PackageRef,
81        version: &Version,
82    ) -> Result<Release, Error> {
83        let path = self.version_path(package, version);
84        tracing::debug!(path = %path.display(), "Reading content from path");
85        let content_digest = ContentDigest::sha256_from_file(path).await?;
86        Ok(Release {
87            version: version.clone(),
88            content_digest,
89        })
90    }
91
92    async fn stream_content_unvalidated(
93        &mut self,
94        package: &PackageRef,
95        content: &Release,
96    ) -> Result<BoxStream<Result<Bytes, Error>>, Error> {
97        let path = self.version_path(package, &content.version);
98        tracing::debug!("Streaming content from {path:?}");
99        let file = tokio::fs::File::open(path).await?;
100        Ok(ReaderStream::new(file).map_err(Into::into).boxed())
101    }
102}