1use super::{
4 ContentStorage, NamespaceMapStorage, OperatorInfo, PackageInfo, PublishInfo, RegistryDomain,
5 RegistryStorage,
6};
7use crate::lock::FileLock;
8use anyhow::{anyhow, bail, Context, Result};
9use async_trait::async_trait;
10use bytes::Bytes;
11use futures_util::{Stream, StreamExt, TryStreamExt};
12use indexmap::IndexMap;
13use serde::{Deserialize, Serialize};
14use std::{
15 ffi::OsStr,
16 fs,
17 path::{Path, PathBuf},
18 pin::Pin,
19 str::FromStr,
20};
21use tempfile::NamedTempFile;
22use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
23use tokio_util::io::ReaderStream;
24use walkdir::WalkDir;
25use warg_crypto::hash::{AnyHash, Digest, Hash, Sha256};
26use warg_protocol::{
27 registry::{LogId, PackageName, TimestampedCheckpoint},
28 SerdeEnvelope,
29};
30
31const TEMP_DIRECTORY: &str = "temp";
32const PENDING_PUBLISH_FILE: &str = "pending-publish.json";
33const LOCK_FILE_NAME: &str = ".lock";
34const PACKAGE_LOGS_DIR: &str = "package-logs";
35
36pub struct FileSystemRegistryStorage {
38 _lock: FileLock,
39 base_dir: PathBuf,
40 registries_dir: PathBuf,
41}
42
43impl FileSystemRegistryStorage {
44 pub fn try_lock(base_dir: impl Into<PathBuf>) -> Result<Option<Self>> {
50 let base_dir = base_dir.into();
51 let registries_dir = &mut base_dir
52 .parent()
53 .context("base_dir cannot be empty")?
54 .to_path_buf();
55 match FileLock::try_open_rw(base_dir.join(LOCK_FILE_NAME))? {
56 Some(lock) => Ok(Some(Self {
57 _lock: lock,
58 base_dir,
59 registries_dir: registries_dir.to_path_buf(),
60 })),
61 None => Ok(None),
62 }
63 }
64
65 pub fn lock(base_dir: impl Into<PathBuf>) -> Result<Self> {
72 let base_dir = base_dir.into();
73 let lock = FileLock::open_rw(base_dir.join(LOCK_FILE_NAME))?;
74 let registries_dir = &mut base_dir
75 .parent()
76 .context("base_dir cannot be empty")?
77 .to_path_buf();
78 Ok(Self {
79 _lock: lock,
80 base_dir,
81 registries_dir: registries_dir.to_path_buf(),
82 })
83 }
84
85 fn operator_path(&self, namespace_registry: Option<&RegistryDomain>) -> PathBuf {
86 if let Some(nm) = namespace_registry {
87 return self
88 .registries_dir
89 .join(nm.to_string())
90 .join("operator.log");
91 }
92 self.base_dir.join("operator.log")
93 }
94
95 fn package_path(
96 &self,
97 namespace_registry: Option<&RegistryDomain>,
98 name: &PackageName,
99 ) -> PathBuf {
100 if let Some(nm) = namespace_registry {
101 return self
102 .registries_dir
103 .join(nm.to_string())
104 .join(PACKAGE_LOGS_DIR)
105 .join(
106 LogId::package_log::<Sha256>(name)
107 .to_string()
108 .replace(':', "/"),
109 );
110 }
111 self.base_dir.join(PACKAGE_LOGS_DIR).join(
112 LogId::package_log::<Sha256>(name)
113 .to_string()
114 .replace(':', "/"),
115 )
116 }
117
118 fn pending_publish_path(&self) -> PathBuf {
119 self.base_dir.join(PENDING_PUBLISH_FILE)
120 }
121}
122
123#[async_trait]
124impl RegistryStorage for FileSystemRegistryStorage {
125 async fn reset(&self, all_registries: bool) -> Result<()> {
126 if all_registries {
127 remove(self.base_dir.parent().unwrap()).await
128 } else {
129 remove(&self.base_dir).await
130 }
131 }
132
133 async fn load_checkpoint(
134 &self,
135 namespace_registry: Option<&RegistryDomain>,
136 ) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>> {
137 if let Some(nm) = namespace_registry {
138 return load(&self.registries_dir.join(nm.to_string()).join("checkpoint")).await;
139 }
140 load(&self.base_dir.join("checkpoint")).await
141 }
142
143 async fn store_checkpoint(
144 &self,
145 namespace_registry: Option<&RegistryDomain>,
146 ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
147 ) -> Result<()> {
148 if let Some(nm) = namespace_registry {
149 return store(
150 &self.registries_dir.join(nm.to_string()).join("checkpoint"),
151 ts_checkpoint,
152 )
153 .await;
154 }
155 store(&self.base_dir.join("checkpoint"), ts_checkpoint).await
156 }
157
158 async fn load_all_packages(&self) -> Result<IndexMap<RegistryDomain, Vec<PackageInfo>>> {
159 let mut all_packages = IndexMap::new();
160 let regs = fs::read_dir(self.registries_dir.clone())?;
161 for reg in regs {
162 let folder = reg?;
163 if let Some(name) = folder.file_name().to_str() {
164 let packages_dir = self
165 .registries_dir
166 .join(folder.file_name())
167 .join(PACKAGE_LOGS_DIR);
168 let mut packages = Vec::new();
169 for entry in WalkDir::new(&packages_dir).into_iter().flatten() {
170 let path = entry.path();
171 if !path.is_file() {
172 continue;
173 }
174
175 if let Some(name) = path.file_name().and_then(OsStr::to_str) {
176 if name.starts_with('.') {
177 continue;
178 }
179 }
180
181 let info: PackageInfo = load(path).await?.ok_or_else(|| {
182 anyhow!(
183 "failed to load package state from `{path}`",
184 path = path.display()
185 )
186 })?;
187 packages.push(info);
188 }
189 all_packages.insert(RegistryDomain::from_str(name)?, packages);
190 };
191 }
192 Ok(all_packages)
193 }
194
195 async fn load_operator(
196 &self,
197 namespace_registry: Option<&RegistryDomain>,
198 ) -> Result<Option<OperatorInfo>> {
199 Ok(load(&self.operator_path(namespace_registry)).await?)
200 }
201
202 async fn store_operator(
203 &self,
204 namespace_registry: Option<&RegistryDomain>,
205 info: OperatorInfo,
206 ) -> Result<()> {
207 store(&self.operator_path(namespace_registry), info).await
208 }
209
210 async fn load_package(
211 &self,
212 namespace_registry: Option<&RegistryDomain>,
213 package: &PackageName,
214 ) -> Result<Option<PackageInfo>> {
215 Ok(load(&self.package_path(namespace_registry, package)).await?)
216 }
217
218 async fn store_package(
219 &self,
220 namespace_registry: Option<&RegistryDomain>,
221 info: &PackageInfo,
222 ) -> Result<()> {
223 store(&self.package_path(namespace_registry, &info.name), info).await
224 }
225
226 async fn load_publish(&self) -> Result<Option<PublishInfo>> {
227 Ok(load(&self.base_dir.join(PENDING_PUBLISH_FILE))
228 .await?
229 .unwrap_or_default())
230 }
231
232 async fn store_publish(&self, info: Option<&PublishInfo>) -> Result<()> {
233 let path = self.pending_publish_path();
234 match info {
235 Some(info) => store(&path, info).await,
236 None => delete(&path).await,
237 }
238 }
239}
240
241pub struct FileSystemContentStorage {
243 _lock: FileLock,
244 base_dir: PathBuf,
245 temp_dir: PathBuf,
246}
247
248impl FileSystemContentStorage {
249 pub fn try_lock(base_dir: impl Into<PathBuf>) -> Result<Option<Self>> {
255 let base_dir = base_dir.into();
256 let temp_dir = base_dir.join(TEMP_DIRECTORY);
257 match FileLock::try_open_rw(base_dir.join(LOCK_FILE_NAME))? {
258 Some(lock) => Ok(Some(Self {
259 _lock: lock,
260 base_dir,
261 temp_dir,
262 })),
263 None => Ok(None),
264 }
265 }
266
267 pub fn lock(base_dir: impl Into<PathBuf>) -> Result<Self> {
274 let base_dir = base_dir.into();
275 let temp_dir = base_dir.join(TEMP_DIRECTORY);
276 let lock = FileLock::open_rw(base_dir.join(LOCK_FILE_NAME))?;
277 Ok(Self {
278 _lock: lock,
279 base_dir,
280 temp_dir,
281 })
282 }
283
284 fn temp_file(&self) -> Result<NamedTempFile> {
285 fs::create_dir_all(&self.temp_dir).with_context(|| {
286 format!(
287 "failed to create directory `{path}`",
288 path = self.temp_dir.display()
289 )
290 })?;
291
292 NamedTempFile::new_in(&self.temp_dir).with_context(|| {
293 format!(
294 "failed to create temporary file in `{path}`",
295 path = self.temp_dir.display()
296 )
297 })
298 }
299
300 fn content_path(&self, digest: &AnyHash) -> PathBuf {
301 self.base_dir.join(digest.to_string().replace(':', "/"))
302 }
303}
304
305#[async_trait]
306impl ContentStorage for FileSystemContentStorage {
307 async fn clear(&self) -> Result<()> {
308 remove(&self.base_dir).await
309 }
310
311 fn content_location(&self, digest: &AnyHash) -> Option<PathBuf> {
312 let path = self.content_path(digest);
313 if path.is_file() {
314 Some(path)
315 } else {
316 None
317 }
318 }
319
320 async fn load_content(
321 &self,
322 digest: &AnyHash,
323 ) -> Result<Option<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>>> {
324 let path = self.content_path(digest);
325 if !path.is_file() {
326 return Ok(None);
327 }
328
329 Ok(Some(Box::pin(
330 ReaderStream::new(BufReader::new(
331 tokio::fs::File::open(&path)
332 .await
333 .with_context(|| format!("failed to open `{path}`", path = path.display()))?,
334 ))
335 .map_err(|e| anyhow!(e)),
336 )))
337 }
338
339 async fn store_content(
340 &self,
341 mut stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
342 expected_digest: Option<&AnyHash>,
343 ) -> Result<AnyHash> {
344 let (file, path) = self.temp_file()?.into_parts();
345 let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
346 let mut hasher = Sha256::new();
347
348 while let Some(bytes) = stream.next().await.transpose()? {
349 hasher.update(&bytes);
350 writer
351 .write_all(&bytes)
352 .await
353 .with_context(|| format!("failed to write to `{path}`", path = path.display()))?;
354 }
355
356 let hash = AnyHash::from(Hash::<Sha256>::from(hasher.finalize()));
357
358 if let Some(expected) = expected_digest {
359 if hash != *expected {
360 bail!(
361 "stored content has digest `{hash}` but a digest of `{expected}` was expected",
362 );
363 }
364 }
365
366 writer
367 .shutdown()
368 .await
369 .with_context(|| format!("failed to write `{path}`", path = path.display()))?;
370
371 drop(writer);
372
373 let content_path = self.content_path(&hash);
374 if !content_path.is_file() {
375 if let Some(parent) = content_path.parent() {
376 fs::create_dir_all(parent).with_context(|| {
377 format!(
378 "failed to create directory `{path}`",
379 path = parent.display()
380 )
381 })?;
382 }
383
384 path.persist(&content_path).with_context(|| {
385 format!(
386 "failed to persist temporary file to `{path}`",
387 path = content_path.display()
388 )
389 })?;
390 }
391
392 Ok(hash)
393 }
394}
395
396pub struct FileSystemNamespaceMapStorage {
398 path: PathBuf,
399}
400
401impl FileSystemNamespaceMapStorage {
402 pub fn new(path: impl Into<PathBuf>) -> Self {
404 Self { path: path.into() }
405 }
406}
407
408#[async_trait]
409impl NamespaceMapStorage for FileSystemNamespaceMapStorage {
410 async fn load_namespace_map(&self) -> Result<Option<IndexMap<String, String>>> {
411 let namespace_path = &self.path;
412 let namespace_map = load(namespace_path).await?;
413 Ok(namespace_map)
414 }
415
416 async fn reset_namespaces(&self) -> Result<()> {
417 delete(&self.path).await?;
418 Ok(())
419 }
420
421 async fn store_namespace(
422 &self,
423 namespace: String,
424 registry_domain: RegistryDomain,
425 ) -> Result<()> {
426 let mut mapping = self.load_namespace_map().await?.unwrap_or_default();
427 mapping.insert(namespace, registry_domain.to_string());
428 let json = serde_json::to_string(&mapping)?;
429 fs::write(&self.path, json)?;
430 Ok(())
431 }
432}
433
434async fn remove(path: &Path) -> Result<()> {
435 if path.is_file() {
436 return tokio::fs::remove_file(path)
437 .await
438 .with_context(|| format!("failed to remove file `{path}`", path = path.display()));
439 }
440
441 tokio::fs::remove_dir_all(path)
442 .await
443 .with_context(|| format!("failed to remove directory `{path}`", path = path.display()))
444}
445
446async fn load<T: for<'a> Deserialize<'a>>(path: &Path) -> Result<Option<T>> {
447 if !path.is_file() {
448 return Ok(None);
449 }
450
451 let contents = tokio::fs::read_to_string(path)
452 .await
453 .with_context(|| format!("failed to read `{path}`", path = path.display()))?;
454
455 serde_json::from_str(&contents).with_context(|| {
456 format!(
457 "failed to deserialize contents of `{path}`",
458 path = path.display()
459 )
460 })
461}
462
463async fn store(path: &Path, value: impl Serialize) -> Result<()> {
464 if let Some(parent) = path.parent() {
465 std::fs::create_dir_all(parent).with_context(|| {
466 format!(
467 "failed to create parent directory for `{path}`",
468 path = path.display()
469 )
470 })?;
471 }
472
473 let contents = serde_json::to_vec_pretty(&value).with_context(|| {
474 format!(
475 "failed to serialize contents of `{path}`",
476 path = path.display()
477 )
478 })?;
479
480 tokio::fs::write(path, contents)
481 .await
482 .with_context(|| format!("failed to write `{path}`", path = path.display()))
483}
484
485async fn delete(path: &Path) -> Result<()> {
486 if path.is_file() {
487 tokio::fs::remove_file(path)
488 .await
489 .with_context(|| format!("failed to delete file `{path}`", path = path.display()))?;
490 }
491
492 Ok(())
493}