1use async_trait::async_trait;
15use pf_core::cas::BlobStore;
16use pf_core::digest::Digest256;
17use pf_core::manifest::Manifest;
18
19use crate::image_ref::ImageRef;
20use crate::registry::{LayerSet, Registry, RegistryError, transitive_blob_digests};
21use crate::sign::{sign_manifest, verify_manifest};
22
23#[derive(Default, Debug)]
26pub struct FileRegistry {
27 sign_key: Option<String>,
31}
32
33impl FileRegistry {
34 pub fn new() -> Self {
35 Self {
36 sign_key: std::env::var("PF_FILE_REG_SIGN_KEY").ok(),
37 }
38 }
39
40 #[must_use]
41 pub fn with_sign_key(mut self, key: impl Into<String>) -> Self {
42 self.sign_key = Some(key.into());
43 self
44 }
45
46 fn root(target: &ImageRef) -> Result<&std::path::Path, RegistryError> {
47 match target {
48 ImageRef::File { path } => Ok(path.as_path()),
49 other => Err(RegistryError::Backend(format!(
50 "FileRegistry called with non-file ref {other:?}"
51 ))),
52 }
53 }
54}
55
56#[async_trait]
57impl Registry for FileRegistry {
58 async fn push(
59 &self,
60 target: &ImageRef,
61 manifest: &Manifest,
62 blobs: &dyn BlobStore,
63 ) -> Result<(), RegistryError> {
64 let root = Self::root(target)?;
65 std::fs::create_dir_all(root.join("blobs").join("sha256"))
66 .map_err(|e| RegistryError::Backend(format!("mkdir: {e}")))?;
67
68 let manifest_bytes = serde_json::to_vec(manifest)
70 .map_err(|e| RegistryError::Backend(format!("manifest serialize: {e}")))?;
71 std::fs::write(root.join("manifest.json"), &manifest_bytes)
72 .map_err(|e| RegistryError::Backend(format!("write manifest: {e}")))?;
73
74 let sig = sign_manifest(&manifest_bytes, self.sign_key.as_deref());
76 std::fs::write(
77 root.join("manifest.json.sig"),
78 serde_json::to_vec(&sig).unwrap(),
79 )
80 .map_err(|e| RegistryError::Backend(format!("write sig: {e}")))?;
81
82 for digest in transitive_blob_digests(manifest, blobs)? {
86 copy_blob(blobs, root, &digest)?;
87 }
88 Ok(())
89 }
90
91 async fn pull(&self, source: &ImageRef) -> Result<LayerSet, RegistryError> {
92 let root = Self::root(source)?;
93 let manifest_bytes = std::fs::read(root.join("manifest.json"))
94 .map_err(|e| RegistryError::Backend(format!("read manifest: {e}")))?;
95
96 let sig_bytes = std::fs::read(root.join("manifest.json.sig"))
98 .map_err(|e| RegistryError::Backend(format!("read sig: {e}")))?;
99 let sig = serde_json::from_slice(&sig_bytes)
100 .map_err(|e| RegistryError::SignatureVerify(format!("parse sig: {e}")))?;
101 verify_manifest(&manifest_bytes, &sig, self.sign_key.as_deref())
102 .map_err(RegistryError::SignatureVerify)?;
103
104 let manifest: Manifest = serde_json::from_slice(&manifest_bytes)
105 .map_err(|e| RegistryError::Backend(format!("parse manifest: {e}")))?;
106
107 let mut blobs = Vec::new();
115 let blobs_root = root.join("blobs").join("sha256");
116 if blobs_root.exists() {
117 for shard in std::fs::read_dir(&blobs_root)
118 .map_err(|e| RegistryError::Backend(format!("read blobs/: {e}")))?
119 {
120 let shard = shard.map_err(|e| RegistryError::Backend(format!("shard: {e}")))?;
121 if !shard
122 .file_type()
123 .map_err(|e| RegistryError::Backend(format!("shard ft: {e}")))?
124 .is_dir()
125 {
126 continue;
127 }
128 for blob in std::fs::read_dir(shard.path())
129 .map_err(|e| RegistryError::Backend(format!("read shard: {e}")))?
130 {
131 let blob = blob.map_err(|e| RegistryError::Backend(format!("blob: {e}")))?;
132 let name = blob.file_name().to_string_lossy().to_string();
133 let hex = name.strip_suffix(".zst").unwrap_or(&name);
134 let Ok(d) = Digest256::parse(&format!("sha256:{hex}")) else {
135 continue;
136 };
137 let bytes = read_blob(root, &d)?;
138 blobs.push((d, bytes));
139 }
140 }
141 }
142 Ok(LayerSet { manifest, blobs })
143 }
144
145 async fn exists(&self, source: &ImageRef) -> Result<bool, RegistryError> {
146 Ok(Self::root(source)?.join("manifest.json").exists())
147 }
148}
149
150fn blob_path(root: &std::path::Path, d: &Digest256) -> std::path::PathBuf {
151 let hex = d.hex();
152 root.join("blobs")
153 .join("sha256")
154 .join(&hex[..2])
155 .join(format!("{hex}.zst"))
156}
157
158fn copy_blob(
159 blobs: &dyn BlobStore,
160 root: &std::path::Path,
161 d: &Digest256,
162) -> Result<(), RegistryError> {
163 let dest = blob_path(root, d);
164 if dest.exists() {
165 return Ok(());
166 }
167 let bytes = blobs.get(d)?;
168 let compressed = zstd::encode_all(bytes.as_slice(), 19)
169 .map_err(|e| RegistryError::Backend(format!("zstd encode: {e}")))?;
170 if let Some(parent) = dest.parent() {
171 std::fs::create_dir_all(parent)
172 .map_err(|e| RegistryError::Backend(format!("mkdir blob shard: {e}")))?;
173 }
174 std::fs::write(&dest, compressed)
175 .map_err(|e| RegistryError::Backend(format!("write blob: {e}")))?;
176 Ok(())
177}
178
179fn read_blob(root: &std::path::Path, d: &Digest256) -> Result<Vec<u8>, RegistryError> {
180 let src = blob_path(root, d);
181 let compressed = std::fs::read(&src)
182 .map_err(|e| RegistryError::Backend(format!("read blob {}: {e}", src.display())))?;
183 let bytes = zstd::decode_all(compressed.as_slice())
184 .map_err(|e| RegistryError::Backend(format!("zstd decode: {e}")))?;
185 let observed = Digest256::of(&bytes);
186 if &observed != d {
187 return Err(RegistryError::Core(pf_core::Error::Integrity(format!(
188 "registry blob {d} hashes to {observed}"
189 ))));
190 }
191 Ok(bytes)
192}