warg_client/storage/
fs.rs

1//! A module for file system client storage.
2
3use super::{
4    ContentStorage, NamespaceMapStorage, OperatorInfo, PackageInfo, PublishInfo, RegistryDomain,
5    RegistryStorage,
6};
7use crate::lock::FileLock;
8use anyhow::{anyhow, bail, Context, Result};
9use async_trait::async_trait;
10use bytes::Bytes;
11use futures_util::{Stream, StreamExt, TryStreamExt};
12use indexmap::IndexMap;
13use serde::{Deserialize, Serialize};
14use std::{
15    ffi::OsStr,
16    fs,
17    path::{Path, PathBuf},
18    pin::Pin,
19    str::FromStr,
20};
21use tempfile::NamedTempFile;
22use tokio::io::{AsyncWriteExt, BufReader, BufWriter};
23use tokio_util::io::ReaderStream;
24use walkdir::WalkDir;
25use warg_crypto::hash::{AnyHash, Digest, Hash, Sha256};
26use warg_protocol::{
27    registry::{LogId, PackageName, TimestampedCheckpoint},
28    SerdeEnvelope,
29};
30
31const TEMP_DIRECTORY: &str = "temp";
32const PENDING_PUBLISH_FILE: &str = "pending-publish.json";
33const LOCK_FILE_NAME: &str = ".lock";
34const PACKAGE_LOGS_DIR: &str = "package-logs";
35
36/// Represents a package storage using the local file system.
37pub struct FileSystemRegistryStorage {
38    _lock: FileLock,
39    base_dir: PathBuf,
40    registries_dir: PathBuf,
41}
42
43impl FileSystemRegistryStorage {
44    /// Attempts to lock the package storage.
45    ///
46    /// The base directory will be created if it does not exist.
47    ///
48    /// If the lock cannot be acquired, `Ok(None)` is returned.
49    pub fn try_lock(base_dir: impl Into<PathBuf>) -> Result<Option<Self>> {
50        let base_dir = base_dir.into();
51        let registries_dir = &mut base_dir
52            .parent()
53            .context("base_dir cannot be empty")?
54            .to_path_buf();
55        match FileLock::try_open_rw(base_dir.join(LOCK_FILE_NAME))? {
56            Some(lock) => Ok(Some(Self {
57                _lock: lock,
58                base_dir,
59                registries_dir: registries_dir.to_path_buf(),
60            })),
61            None => Ok(None),
62        }
63    }
64
65    /// Locks a new package storage at the given base directory.
66    ///
67    /// The base directory will be created if it does not exist.
68    ///
69    /// If the lock cannot be immediately acquired, this function
70    /// will block.
71    pub fn lock(base_dir: impl Into<PathBuf>) -> Result<Self> {
72        let base_dir = base_dir.into();
73        let lock = FileLock::open_rw(base_dir.join(LOCK_FILE_NAME))?;
74        let registries_dir = &mut base_dir
75            .parent()
76            .context("base_dir cannot be empty")?
77            .to_path_buf();
78        Ok(Self {
79            _lock: lock,
80            base_dir,
81            registries_dir: registries_dir.to_path_buf(),
82        })
83    }
84
85    fn operator_path(&self, namespace_registry: Option<&RegistryDomain>) -> PathBuf {
86        if let Some(nm) = namespace_registry {
87            return self
88                .registries_dir
89                .join(nm.to_string())
90                .join("operator.log");
91        }
92        self.base_dir.join("operator.log")
93    }
94
95    fn package_path(
96        &self,
97        namespace_registry: Option<&RegistryDomain>,
98        name: &PackageName,
99    ) -> PathBuf {
100        if let Some(nm) = namespace_registry {
101            return self
102                .registries_dir
103                .join(nm.to_string())
104                .join(PACKAGE_LOGS_DIR)
105                .join(
106                    LogId::package_log::<Sha256>(name)
107                        .to_string()
108                        .replace(':', "/"),
109                );
110        }
111        self.base_dir.join(PACKAGE_LOGS_DIR).join(
112            LogId::package_log::<Sha256>(name)
113                .to_string()
114                .replace(':', "/"),
115        )
116    }
117
118    fn pending_publish_path(&self) -> PathBuf {
119        self.base_dir.join(PENDING_PUBLISH_FILE)
120    }
121}
122
123#[async_trait]
124impl RegistryStorage for FileSystemRegistryStorage {
125    async fn reset(&self, all_registries: bool) -> Result<()> {
126        if all_registries {
127            remove(self.base_dir.parent().unwrap()).await
128        } else {
129            remove(&self.base_dir).await
130        }
131    }
132
133    async fn load_checkpoint(
134        &self,
135        namespace_registry: Option<&RegistryDomain>,
136    ) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>> {
137        if let Some(nm) = namespace_registry {
138            return load(&self.registries_dir.join(nm.to_string()).join("checkpoint")).await;
139        }
140        load(&self.base_dir.join("checkpoint")).await
141    }
142
143    async fn store_checkpoint(
144        &self,
145        namespace_registry: Option<&RegistryDomain>,
146        ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
147    ) -> Result<()> {
148        if let Some(nm) = namespace_registry {
149            return store(
150                &self.registries_dir.join(nm.to_string()).join("checkpoint"),
151                ts_checkpoint,
152            )
153            .await;
154        }
155        store(&self.base_dir.join("checkpoint"), ts_checkpoint).await
156    }
157
158    async fn load_all_packages(&self) -> Result<IndexMap<RegistryDomain, Vec<PackageInfo>>> {
159        let mut all_packages = IndexMap::new();
160        let regs = fs::read_dir(self.registries_dir.clone())?;
161        for reg in regs {
162            let folder = reg?;
163            if let Some(name) = folder.file_name().to_str() {
164                let packages_dir = self
165                    .registries_dir
166                    .join(folder.file_name())
167                    .join(PACKAGE_LOGS_DIR);
168                let mut packages = Vec::new();
169                for entry in WalkDir::new(&packages_dir).into_iter().flatten() {
170                    let path = entry.path();
171                    if !path.is_file() {
172                        continue;
173                    }
174
175                    if let Some(name) = path.file_name().and_then(OsStr::to_str) {
176                        if name.starts_with('.') {
177                            continue;
178                        }
179                    }
180
181                    let info: PackageInfo = load(path).await?.ok_or_else(|| {
182                        anyhow!(
183                            "failed to load package state from `{path}`",
184                            path = path.display()
185                        )
186                    })?;
187                    packages.push(info);
188                }
189                all_packages.insert(RegistryDomain::from_str(name)?, packages);
190            };
191        }
192        Ok(all_packages)
193    }
194
195    async fn load_operator(
196        &self,
197        namespace_registry: Option<&RegistryDomain>,
198    ) -> Result<Option<OperatorInfo>> {
199        Ok(load(&self.operator_path(namespace_registry)).await?)
200    }
201
202    async fn store_operator(
203        &self,
204        namespace_registry: Option<&RegistryDomain>,
205        info: OperatorInfo,
206    ) -> Result<()> {
207        store(&self.operator_path(namespace_registry), info).await
208    }
209
210    async fn load_package(
211        &self,
212        namespace_registry: Option<&RegistryDomain>,
213        package: &PackageName,
214    ) -> Result<Option<PackageInfo>> {
215        Ok(load(&self.package_path(namespace_registry, package)).await?)
216    }
217
218    async fn store_package(
219        &self,
220        namespace_registry: Option<&RegistryDomain>,
221        info: &PackageInfo,
222    ) -> Result<()> {
223        store(&self.package_path(namespace_registry, &info.name), info).await
224    }
225
226    async fn load_publish(&self) -> Result<Option<PublishInfo>> {
227        Ok(load(&self.base_dir.join(PENDING_PUBLISH_FILE))
228            .await?
229            .unwrap_or_default())
230    }
231
232    async fn store_publish(&self, info: Option<&PublishInfo>) -> Result<()> {
233        let path = self.pending_publish_path();
234        match info {
235            Some(info) => store(&path, info).await,
236            None => delete(&path).await,
237        }
238    }
239}
240
241/// Represents a content storage using the local file system.
242pub struct FileSystemContentStorage {
243    _lock: FileLock,
244    base_dir: PathBuf,
245    temp_dir: PathBuf,
246}
247
248impl FileSystemContentStorage {
249    /// Attempts to lock the content storage.
250    ///
251    /// The base directory will be created if it does not exist.
252    ///
253    /// If the lock cannot be acquired, `Ok(None)` is returned.
254    pub fn try_lock(base_dir: impl Into<PathBuf>) -> Result<Option<Self>> {
255        let base_dir = base_dir.into();
256        let temp_dir = base_dir.join(TEMP_DIRECTORY);
257        match FileLock::try_open_rw(base_dir.join(LOCK_FILE_NAME))? {
258            Some(lock) => Ok(Some(Self {
259                _lock: lock,
260                base_dir,
261                temp_dir,
262            })),
263            None => Ok(None),
264        }
265    }
266
267    /// Locks a new content storage at the given base directory.
268    ///
269    /// The base directory will be created if it does not exist.
270    ///
271    /// If the lock cannot be immediately acquired, this function
272    /// will block.
273    pub fn lock(base_dir: impl Into<PathBuf>) -> Result<Self> {
274        let base_dir = base_dir.into();
275        let temp_dir = base_dir.join(TEMP_DIRECTORY);
276        let lock = FileLock::open_rw(base_dir.join(LOCK_FILE_NAME))?;
277        Ok(Self {
278            _lock: lock,
279            base_dir,
280            temp_dir,
281        })
282    }
283
284    fn temp_file(&self) -> Result<NamedTempFile> {
285        fs::create_dir_all(&self.temp_dir).with_context(|| {
286            format!(
287                "failed to create directory `{path}`",
288                path = self.temp_dir.display()
289            )
290        })?;
291
292        NamedTempFile::new_in(&self.temp_dir).with_context(|| {
293            format!(
294                "failed to create temporary file in `{path}`",
295                path = self.temp_dir.display()
296            )
297        })
298    }
299
300    fn content_path(&self, digest: &AnyHash) -> PathBuf {
301        self.base_dir.join(digest.to_string().replace(':', "/"))
302    }
303}
304
305#[async_trait]
306impl ContentStorage for FileSystemContentStorage {
307    async fn clear(&self) -> Result<()> {
308        remove(&self.base_dir).await
309    }
310
311    fn content_location(&self, digest: &AnyHash) -> Option<PathBuf> {
312        let path = self.content_path(digest);
313        if path.is_file() {
314            Some(path)
315        } else {
316            None
317        }
318    }
319
320    async fn load_content(
321        &self,
322        digest: &AnyHash,
323    ) -> Result<Option<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>>> {
324        let path = self.content_path(digest);
325        if !path.is_file() {
326            return Ok(None);
327        }
328
329        Ok(Some(Box::pin(
330            ReaderStream::new(BufReader::new(
331                tokio::fs::File::open(&path)
332                    .await
333                    .with_context(|| format!("failed to open `{path}`", path = path.display()))?,
334            ))
335            .map_err(|e| anyhow!(e)),
336        )))
337    }
338
339    async fn store_content(
340        &self,
341        mut stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
342        expected_digest: Option<&AnyHash>,
343    ) -> Result<AnyHash> {
344        let (file, path) = self.temp_file()?.into_parts();
345        let mut writer = BufWriter::new(tokio::fs::File::from_std(file));
346        let mut hasher = Sha256::new();
347
348        while let Some(bytes) = stream.next().await.transpose()? {
349            hasher.update(&bytes);
350            writer
351                .write_all(&bytes)
352                .await
353                .with_context(|| format!("failed to write to `{path}`", path = path.display()))?;
354        }
355
356        let hash = AnyHash::from(Hash::<Sha256>::from(hasher.finalize()));
357
358        if let Some(expected) = expected_digest {
359            if hash != *expected {
360                bail!(
361                    "stored content has digest `{hash}` but a digest of `{expected}` was expected",
362                );
363            }
364        }
365
366        writer
367            .shutdown()
368            .await
369            .with_context(|| format!("failed to write `{path}`", path = path.display()))?;
370
371        drop(writer);
372
373        let content_path = self.content_path(&hash);
374        if !content_path.is_file() {
375            if let Some(parent) = content_path.parent() {
376                fs::create_dir_all(parent).with_context(|| {
377                    format!(
378                        "failed to create directory `{path}`",
379                        path = parent.display()
380                    )
381                })?;
382            }
383
384            path.persist(&content_path).with_context(|| {
385                format!(
386                    "failed to persist temporary file to `{path}`",
387                    path = content_path.display()
388                )
389            })?;
390        }
391
392        Ok(hash)
393    }
394}
395
396/// Represents a namespace_domain map storage using the local file system.
397pub struct FileSystemNamespaceMapStorage {
398    path: PathBuf,
399}
400
401impl FileSystemNamespaceMapStorage {
402    /// Creates new namespace_domain mapping config
403    pub fn new(path: impl Into<PathBuf>) -> Self {
404        Self { path: path.into() }
405    }
406}
407
408#[async_trait]
409impl NamespaceMapStorage for FileSystemNamespaceMapStorage {
410    async fn load_namespace_map(&self) -> Result<Option<IndexMap<String, String>>> {
411        let namespace_path = &self.path;
412        let namespace_map = load(namespace_path).await?;
413        Ok(namespace_map)
414    }
415
416    async fn reset_namespaces(&self) -> Result<()> {
417        delete(&self.path).await?;
418        Ok(())
419    }
420
421    async fn store_namespace(
422        &self,
423        namespace: String,
424        registry_domain: RegistryDomain,
425    ) -> Result<()> {
426        let mut mapping = self.load_namespace_map().await?.unwrap_or_default();
427        mapping.insert(namespace, registry_domain.to_string());
428        let json = serde_json::to_string(&mapping)?;
429        fs::write(&self.path, json)?;
430        Ok(())
431    }
432}
433
434async fn remove(path: &Path) -> Result<()> {
435    if path.is_file() {
436        return tokio::fs::remove_file(path)
437            .await
438            .with_context(|| format!("failed to remove file `{path}`", path = path.display()));
439    }
440
441    tokio::fs::remove_dir_all(path)
442        .await
443        .with_context(|| format!("failed to remove directory `{path}`", path = path.display()))
444}
445
446async fn load<T: for<'a> Deserialize<'a>>(path: &Path) -> Result<Option<T>> {
447    if !path.is_file() {
448        return Ok(None);
449    }
450
451    let contents = tokio::fs::read_to_string(path)
452        .await
453        .with_context(|| format!("failed to read `{path}`", path = path.display()))?;
454
455    serde_json::from_str(&contents).with_context(|| {
456        format!(
457            "failed to deserialize contents of `{path}`",
458            path = path.display()
459        )
460    })
461}
462
463async fn store(path: &Path, value: impl Serialize) -> Result<()> {
464    if let Some(parent) = path.parent() {
465        std::fs::create_dir_all(parent).with_context(|| {
466            format!(
467                "failed to create parent directory for `{path}`",
468                path = path.display()
469            )
470        })?;
471    }
472
473    let contents = serde_json::to_vec_pretty(&value).with_context(|| {
474        format!(
475            "failed to serialize contents of `{path}`",
476            path = path.display()
477        )
478    })?;
479
480    tokio::fs::write(path, contents)
481        .await
482        .with_context(|| format!("failed to write `{path}`", path = path.display()))
483}
484
485async fn delete(path: &Path) -> Result<()> {
486    if path.is_file() {
487        tokio::fs::remove_file(path)
488            .await
489            .with_context(|| format!("failed to delete file `{path}`", path = path.display()))?;
490    }
491
492    Ok(())
493}