warg_client/
storage.rs

1//! A module for client storage implementations.
2
3use anyhow::{Error, Result};
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures_util::Stream;
7use indexmap::IndexMap;
8use reqwest::header::HeaderValue;
9use serde::{Deserialize, Serialize};
10use std::{fmt, path::PathBuf, pin::Pin, str::FromStr, time::SystemTime};
11use warg_crypto::{
12    hash::{AnyHash, HashAlgorithm},
13    signing::{self, KeyID, PublicKey},
14};
15use warg_protocol::{
16    operator,
17    package::{self, PackageRecord, Permission, PACKAGE_RECORD_VERSION},
18    registry::{Checkpoint, PackageName, RecordId, RegistryIndex, TimestampedCheckpoint},
19    ProtoEnvelope, SerdeEnvelope, Version,
20};
21
22mod fs;
23pub use fs::*;
24
25/// Registry domain used for warg header values
26#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
27pub struct RegistryDomain(String);
28
29impl RegistryDomain {
30    /// Creates new `RegistryDomain` from string.
31    pub fn new(registry: String) -> Self {
32        Self(registry)
33    }
34
35    /// Extracts a string slice for the registry domain.
36    pub fn as_str(&self) -> &str {
37        self.0.as_str()
38    }
39}
40
41impl FromStr for RegistryDomain {
42    type Err = Error;
43
44    fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
45        Ok(RegistryDomain(s.to_string()))
46    }
47}
48
49impl fmt::Display for RegistryDomain {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        write!(f, "{registry_domain}", registry_domain = &self.0)
52    }
53}
54
55impl TryFrom<RegistryDomain> for HeaderValue {
56    type Error = Error;
57
58    fn try_from(value: RegistryDomain) -> std::prelude::v1::Result<Self, Self::Error> {
59        Ok(HeaderValue::from_str(&value.to_string())?)
60    }
61}
62
63/// Trait for registry storage implementations.
64///
65/// Stores information such as package/operator logs and checkpoints
66/// on a per-registry basis.
67///
68/// Registry storage data must be synchronized if shared between
69/// multiple threads and processes.
70#[async_trait]
71pub trait RegistryStorage: Send + Sync {
72    /// Reset registry local data
73    async fn reset(&self, all_registries: bool) -> Result<()>;
74
75    // /// Directory where all registries are stored
76    // fn registries_dir(&self) -> PathBuf;
77    /// Loads most recent checkpoint
78    async fn load_checkpoint(
79        &self,
80        namespace_registry: Option<&RegistryDomain>,
81    ) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>>;
82
83    /// Stores most recent checkpoint
84    async fn store_checkpoint(
85        &self,
86        namespace_registry: Option<&RegistryDomain>,
87        ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
88    ) -> Result<()>;
89
90    /// Loads the operator information from the storage.
91    ///
92    /// Returns `Ok(None)` if the information is not present.
93    async fn load_operator(
94        &self,
95        namespace_registry: Option<&RegistryDomain>,
96    ) -> Result<Option<OperatorInfo>>;
97
98    /// Stores the operator information in the storage.
99    async fn store_operator(
100        &self,
101        namespace_registry: Option<&RegistryDomain>,
102        operator: OperatorInfo,
103    ) -> Result<()>;
104
105    /// Loads the package information for all packages.
106    async fn load_all_packages(&self) -> Result<IndexMap<RegistryDomain, Vec<PackageInfo>>>;
107
108    /// Loads the package information from the storage.
109    ///
110    /// Returns `Ok(None)` if the information is not present.
111    async fn load_package(
112        &self,
113        namespace_registry: Option<&RegistryDomain>,
114        package: &PackageName,
115    ) -> Result<Option<PackageInfo>>;
116
117    /// Stores the package information in the storage.
118    async fn store_package(
119        &self,
120        namespace_registry: Option<&RegistryDomain>,
121        info: &PackageInfo,
122    ) -> Result<()>;
123
124    /// Loads information about a pending publish operation.
125    ///
126    /// Returns `Ok(None)` if the information is not present.
127    async fn load_publish(&self) -> Result<Option<PublishInfo>>;
128
129    /// Stores information about a pending publish operation.
130    ///
131    /// If the info is `None`, the any existing publish information is deleted.
132    async fn store_publish(&self, info: Option<&PublishInfo>) -> Result<()>;
133}
134
135/// Trait for content storage implementations.
136///
137/// Content storage data must be synchronized if shared between
138/// multiple threads and processes.
139#[async_trait]
140pub trait ContentStorage: Send + Sync {
141    /// Clear content local data
142    async fn clear(&self) -> Result<()>;
143
144    /// Gets the location of the content associated with the given digest if it
145    /// exists as a file on disk.
146    ///
147    /// Returns `None` if the content is not present on disk.
148    fn content_location(&self, digest: &AnyHash) -> Option<PathBuf>;
149
150    /// Loads the content associated with the given digest as a stream.
151    ///
152    /// If the content is not found, `Ok(None)` is returned.
153    async fn load_content(
154        &self,
155        digest: &AnyHash,
156    ) -> Result<Option<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>>>;
157
158    /// Stores the given stream as content.
159    ///
160    /// If `expected_digest` is `Some`, the storage will verify that the written
161    /// content matches the given digest. If the digests do not match, an
162    /// error is returned.
163    ///
164    /// Returns the hash of the written content.
165    async fn store_content(
166        &self,
167        stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
168        expected_digest: Option<&AnyHash>,
169    ) -> Result<AnyHash>;
170}
171
172/// Trait for namespace map storage implementations.
173///
174/// Namespace Map storage data must be synchronized if shared between
175/// multiple threads an
176#[async_trait]
177pub trait NamespaceMapStorage: Send + Sync {
178    /// Loads namespace map
179    async fn load_namespace_map(&self) -> Result<Option<IndexMap<String, String>>>;
180    /// Reset namespace mappings
181    async fn reset_namespaces(&self) -> Result<()>;
182    /// Store namespace mapping
183    async fn store_namespace(
184        &self,
185        namespace: String,
186        registry_domain: RegistryDomain,
187    ) -> Result<()>;
188}
189
190/// Represents information about a registry operator.
191#[derive(Debug, Clone, Serialize, Deserialize, Default)]
192#[serde(rename_all = "camelCase")]
193pub struct OperatorInfo {
194    /// The registry domain where the package is published.
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    pub registry: Option<RegistryDomain>,
197    /// The last known checkpoint since checking the registry.
198    #[serde(default, skip_serializing_if = "Option::is_none")]
199    pub checkpoint: Option<Checkpoint>,
200    /// The current operator log state
201    #[serde(default)]
202    pub state: operator::LogState,
203    /// The registry log index of the most recent record
204    #[serde(default, skip_serializing_if = "Option::is_none")]
205    pub head_registry_index: Option<RegistryIndex>,
206    /// The fetch token for the most recent record
207    #[serde(default, skip_serializing_if = "Option::is_none")]
208    pub head_fetch_token: Option<String>,
209}
210
211/// Represents information about a registry package.
212#[derive(Debug, Clone, Serialize, Deserialize)]
213#[serde(rename_all = "camelCase")]
214pub struct PackageInfo {
215    /// The package name to publish.
216    pub name: PackageName,
217    /// The registry domain where the package is published.
218    #[serde(default, skip_serializing_if = "Option::is_none")]
219    pub registry: Option<RegistryDomain>,
220    /// The last known checkpoint since checking the registry.
221    #[serde(default, skip_serializing_if = "Option::is_none")]
222    pub checkpoint: Option<Checkpoint>,
223    /// The current package log state
224    #[serde(default)]
225    pub state: package::LogState,
226    /// The registry log index of the most recent record
227    #[serde(default, skip_serializing_if = "Option::is_none")]
228    pub head_registry_index: Option<RegistryIndex>,
229    /// The fetch token for the most recent record
230    #[serde(default, skip_serializing_if = "Option::is_none")]
231    pub head_fetch_token: Option<String>,
232}
233
234impl PackageInfo {
235    /// Creates a new package info for the given package name.
236    pub fn new(name: impl Into<PackageName>) -> Self {
237        Self {
238            name: name.into(),
239            registry: None,
240            checkpoint: None,
241            state: package::LogState::default(),
242            head_registry_index: None,
243            head_fetch_token: None,
244        }
245    }
246}
247
248/// Represents a record entry being published.
249#[derive(Debug, Clone, Serialize, Deserialize)]
250#[serde(tag = "type", rename_all = "camelCase")]
251pub enum PublishEntry {
252    /// The package is being initialized.
253    Init,
254    /// A new release entry is being published.
255    Release {
256        /// The version of the release.
257        version: Version,
258        /// The content digest of the release.
259        content: AnyHash,
260    },
261    /// A release is being yanked.
262    Yank {
263        /// The version of the release being yanked.
264        version: Version,
265    },
266    /// A key is being granted permission(s).
267    Grant {
268        /// The public key being granted to.
269        key: PublicKey,
270        /// The permission(s) being granted.
271        permissions: Vec<Permission>,
272    },
273    /// A key's permission(s) are being revoked.
274    Revoke {
275        /// The key ID being revoked from.
276        key_id: KeyID,
277        /// The permission(s) being revoked.
278        permissions: Vec<Permission>,
279    },
280}
281
282/// Represents information about a package publish.
283#[derive(Debug, Clone, Serialize, Deserialize)]
284#[serde(rename_all = "camelCase")]
285pub struct PublishInfo {
286    /// The package name being published.
287    pub name: PackageName,
288    /// The last known head of the package log to use.
289    ///
290    /// If `None` and the package is not being initialized,
291    /// the latest head of the package log will be fetched prior to publishing.
292    pub head: Option<RecordId>,
293    /// The new record entries to publish.
294    pub entries: Vec<PublishEntry>,
295}
296
297impl PublishInfo {
298    /// Determines if the publish information is initializing the package.
299    pub fn initializing(&self) -> bool {
300        self.entries.iter().any(|e| matches!(e, PublishEntry::Init))
301    }
302
303    pub(crate) fn finalize(
304        self,
305        signing_key: &signing::PrivateKey,
306    ) -> Result<ProtoEnvelope<PackageRecord>> {
307        let mut entries = Vec::with_capacity(self.entries.len());
308        for entry in self.entries {
309            match entry {
310                PublishEntry::Init => {
311                    entries.push(package::PackageEntry::Init {
312                        hash_algorithm: HashAlgorithm::Sha256,
313                        key: signing_key.public_key(),
314                    });
315                }
316                PublishEntry::Release { version, content } => {
317                    entries.push(package::PackageEntry::Release { version, content });
318                }
319                PublishEntry::Yank { version } => {
320                    entries.push(package::PackageEntry::Yank { version })
321                }
322                PublishEntry::Grant { key, permissions } => {
323                    entries.push(package::PackageEntry::GrantFlat { key, permissions })
324                }
325                PublishEntry::Revoke {
326                    key_id,
327                    permissions,
328                } => entries.push(package::PackageEntry::RevokeFlat {
329                    key_id,
330                    permissions,
331                }),
332            }
333        }
334
335        let record = package::PackageRecord {
336            prev: self.head,
337            version: PACKAGE_RECORD_VERSION,
338            // TODO: this seems wrong to record the current time client-side
339            // How can we guarantee that the timestamps are monotonic?
340            // Should incrementing timestamps even be a requirement?
341            timestamp: SystemTime::now(),
342            entries,
343        };
344
345        Ok(ProtoEnvelope::signed_contents(signing_key, record)?)
346    }
347}