1use std::path::{Path, PathBuf};
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use futures_util::{StreamExt, TryStreamExt};
10use serde::Deserialize;
11use sha2::{Digest, Sha256};
12use tokio_util::io::ReaderStream;
13use wasm_pkg_common::{
14 config::RegistryConfig,
15 digest::ContentDigest,
16 package::{PackageRef, Version},
17 Error,
18};
19
20use crate::{
21 loader::PackageLoader,
22 publisher::PackagePublisher,
23 release::{Release, VersionInfo},
24 ContentStream, PublishingSource,
25};
26
27#[derive(Clone, Debug, Deserialize)]
28pub struct LocalConfig {
29 pub root: PathBuf,
30}
31
32pub(crate) struct LocalBackend {
33 root: PathBuf,
34}
35
36impl LocalBackend {
37 pub fn new(registry_config: RegistryConfig) -> Result<Self, Error> {
38 let config = registry_config
39 .backend_config::<LocalConfig>("local")?
40 .ok_or_else(|| {
41 Error::InvalidConfig(anyhow!("'local' backend requires configuration"))
42 })?;
43 Ok(Self { root: config.root })
44 }
45
46 fn package_dir(&self, package: &PackageRef) -> PathBuf {
47 self.root
48 .join(package.namespace().as_ref())
49 .join(package.name().as_ref())
50 }
51
52 fn version_path(&self, package: &PackageRef, version: &Version) -> PathBuf {
53 self.package_dir(package).join(format!("{version}.wasm"))
54 }
55}
56
57#[async_trait]
58impl PackageLoader for LocalBackend {
59 async fn list_all_versions(&self, package: &PackageRef) -> Result<Vec<VersionInfo>, Error> {
60 let mut versions = vec![];
61 let package_dir = self.package_dir(package);
62 tracing::debug!(?package_dir, "Reading versions from path");
63 let mut entries = tokio::fs::read_dir(package_dir).await?;
64 while let Some(entry) = entries.next_entry().await? {
65 let path = entry.path();
66 if path.extension() != Some("wasm".as_ref()) {
67 continue;
68 }
69 let Some(version) = path
70 .file_stem()
71 .unwrap()
72 .to_str()
73 .and_then(|stem| Version::parse(stem).ok())
74 else {
75 tracing::warn!("invalid package file name at {path:?}");
76 continue;
77 };
78 versions.push(VersionInfo {
79 version,
80 yanked: false,
81 });
82 }
83 Ok(versions)
84 }
85
86 async fn get_release(&self, package: &PackageRef, version: &Version) -> Result<Release, Error> {
87 let path = self.version_path(package, version);
88 tracing::debug!(path = %path.display(), "Reading content from path");
89 let content_digest = sha256_from_file(path).await?;
90 Ok(Release {
91 version: version.clone(),
92 content_digest,
93 })
94 }
95
96 async fn stream_content_unvalidated(
97 &self,
98 package: &PackageRef,
99 content: &Release,
100 ) -> Result<ContentStream, Error> {
101 let path = self.version_path(package, &content.version);
102 tracing::debug!("Streaming content from {path:?}");
103 let file = tokio::fs::File::open(path).await?;
104 Ok(ReaderStream::new(file).map_err(Into::into).boxed())
105 }
106}
107
108#[async_trait::async_trait]
109impl PackagePublisher for LocalBackend {
110 async fn publish(
111 &self,
112 package: &PackageRef,
113 version: &Version,
114 mut data: PublishingSource,
115 ) -> Result<(), Error> {
116 let package_dir = self.package_dir(package);
117 tokio::fs::create_dir_all(package_dir).await?;
119 let path = self.version_path(package, version);
120 let mut out = tokio::fs::File::create(path).await?;
121 tokio::io::copy(&mut data, &mut out)
122 .await
123 .map_err(Error::IoError)
124 .map(|_| ())
125 }
126}
127
128async fn sha256_from_file(path: impl AsRef<Path>) -> Result<ContentDigest, std::io::Error> {
129 use tokio::io::AsyncReadExt;
130 let mut file = tokio::fs::File::open(path).await?;
131 let mut hasher = Sha256::new();
132 let mut buf = [0; 4096];
133 loop {
134 let n = file.read(&mut buf).await?;
135 if n == 0 {
136 break;
137 }
138 hasher.update(&buf[..n]);
139 }
140 Ok(hasher.into())
141}