1use anyhow::{Error, Result};
4use async_trait::async_trait;
5use bytes::Bytes;
6use futures_util::Stream;
7use indexmap::IndexMap;
8use reqwest::header::HeaderValue;
9use serde::{Deserialize, Serialize};
10use std::{fmt, path::PathBuf, pin::Pin, str::FromStr, time::SystemTime};
11use warg_crypto::{
12 hash::{AnyHash, HashAlgorithm},
13 signing::{self, KeyID, PublicKey},
14};
15use warg_protocol::{
16 operator,
17 package::{self, PackageRecord, Permission, PACKAGE_RECORD_VERSION},
18 registry::{Checkpoint, PackageName, RecordId, RegistryIndex, TimestampedCheckpoint},
19 ProtoEnvelope, SerdeEnvelope, Version,
20};
21
22mod fs;
23pub use fs::*;
24
25#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
27pub struct RegistryDomain(String);
28
29impl RegistryDomain {
30 pub fn new(registry: String) -> Self {
32 Self(registry)
33 }
34
35 pub fn as_str(&self) -> &str {
37 self.0.as_str()
38 }
39}
40
41impl FromStr for RegistryDomain {
42 type Err = Error;
43
44 fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
45 Ok(RegistryDomain(s.to_string()))
46 }
47}
48
49impl fmt::Display for RegistryDomain {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 write!(f, "{registry_domain}", registry_domain = &self.0)
52 }
53}
54
55impl TryFrom<RegistryDomain> for HeaderValue {
56 type Error = Error;
57
58 fn try_from(value: RegistryDomain) -> std::prelude::v1::Result<Self, Self::Error> {
59 Ok(HeaderValue::from_str(&value.to_string())?)
60 }
61}
62
63#[async_trait]
71pub trait RegistryStorage: Send + Sync {
72 async fn reset(&self, all_registries: bool) -> Result<()>;
74
75 async fn load_checkpoint(
79 &self,
80 namespace_registry: Option<&RegistryDomain>,
81 ) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>>;
82
83 async fn store_checkpoint(
85 &self,
86 namespace_registry: Option<&RegistryDomain>,
87 ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
88 ) -> Result<()>;
89
90 async fn load_operator(
94 &self,
95 namespace_registry: Option<&RegistryDomain>,
96 ) -> Result<Option<OperatorInfo>>;
97
98 async fn store_operator(
100 &self,
101 namespace_registry: Option<&RegistryDomain>,
102 operator: OperatorInfo,
103 ) -> Result<()>;
104
105 async fn load_all_packages(&self) -> Result<IndexMap<RegistryDomain, Vec<PackageInfo>>>;
107
108 async fn load_package(
112 &self,
113 namespace_registry: Option<&RegistryDomain>,
114 package: &PackageName,
115 ) -> Result<Option<PackageInfo>>;
116
117 async fn store_package(
119 &self,
120 namespace_registry: Option<&RegistryDomain>,
121 info: &PackageInfo,
122 ) -> Result<()>;
123
124 async fn load_publish(&self) -> Result<Option<PublishInfo>>;
128
129 async fn store_publish(&self, info: Option<&PublishInfo>) -> Result<()>;
133}
134
135#[async_trait]
140pub trait ContentStorage: Send + Sync {
141 async fn clear(&self) -> Result<()>;
143
144 fn content_location(&self, digest: &AnyHash) -> Option<PathBuf>;
149
150 async fn load_content(
154 &self,
155 digest: &AnyHash,
156 ) -> Result<Option<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>>>;
157
158 async fn store_content(
166 &self,
167 stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
168 expected_digest: Option<&AnyHash>,
169 ) -> Result<AnyHash>;
170}
171
172#[async_trait]
177pub trait NamespaceMapStorage: Send + Sync {
178 async fn load_namespace_map(&self) -> Result<Option<IndexMap<String, String>>>;
180 async fn reset_namespaces(&self) -> Result<()>;
182 async fn store_namespace(
184 &self,
185 namespace: String,
186 registry_domain: RegistryDomain,
187 ) -> Result<()>;
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, Default)]
192#[serde(rename_all = "camelCase")]
193pub struct OperatorInfo {
194 #[serde(default, skip_serializing_if = "Option::is_none")]
196 pub registry: Option<RegistryDomain>,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub checkpoint: Option<Checkpoint>,
200 #[serde(default)]
202 pub state: operator::LogState,
203 #[serde(default, skip_serializing_if = "Option::is_none")]
205 pub head_registry_index: Option<RegistryIndex>,
206 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub head_fetch_token: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213#[serde(rename_all = "camelCase")]
214pub struct PackageInfo {
215 pub name: PackageName,
217 #[serde(default, skip_serializing_if = "Option::is_none")]
219 pub registry: Option<RegistryDomain>,
220 #[serde(default, skip_serializing_if = "Option::is_none")]
222 pub checkpoint: Option<Checkpoint>,
223 #[serde(default)]
225 pub state: package::LogState,
226 #[serde(default, skip_serializing_if = "Option::is_none")]
228 pub head_registry_index: Option<RegistryIndex>,
229 #[serde(default, skip_serializing_if = "Option::is_none")]
231 pub head_fetch_token: Option<String>,
232}
233
234impl PackageInfo {
235 pub fn new(name: impl Into<PackageName>) -> Self {
237 Self {
238 name: name.into(),
239 registry: None,
240 checkpoint: None,
241 state: package::LogState::default(),
242 head_registry_index: None,
243 head_fetch_token: None,
244 }
245 }
246}
247
248#[derive(Debug, Clone, Serialize, Deserialize)]
250#[serde(tag = "type", rename_all = "camelCase")]
251pub enum PublishEntry {
252 Init,
254 Release {
256 version: Version,
258 content: AnyHash,
260 },
261 Yank {
263 version: Version,
265 },
266 Grant {
268 key: PublicKey,
270 permissions: Vec<Permission>,
272 },
273 Revoke {
275 key_id: KeyID,
277 permissions: Vec<Permission>,
279 },
280}
281
282#[derive(Debug, Clone, Serialize, Deserialize)]
284#[serde(rename_all = "camelCase")]
285pub struct PublishInfo {
286 pub name: PackageName,
288 pub head: Option<RecordId>,
293 pub entries: Vec<PublishEntry>,
295}
296
297impl PublishInfo {
298 pub fn initializing(&self) -> bool {
300 self.entries.iter().any(|e| matches!(e, PublishEntry::Init))
301 }
302
303 pub(crate) fn finalize(
304 self,
305 signing_key: &signing::PrivateKey,
306 ) -> Result<ProtoEnvelope<PackageRecord>> {
307 let mut entries = Vec::with_capacity(self.entries.len());
308 for entry in self.entries {
309 match entry {
310 PublishEntry::Init => {
311 entries.push(package::PackageEntry::Init {
312 hash_algorithm: HashAlgorithm::Sha256,
313 key: signing_key.public_key(),
314 });
315 }
316 PublishEntry::Release { version, content } => {
317 entries.push(package::PackageEntry::Release { version, content });
318 }
319 PublishEntry::Yank { version } => {
320 entries.push(package::PackageEntry::Yank { version })
321 }
322 PublishEntry::Grant { key, permissions } => {
323 entries.push(package::PackageEntry::GrantFlat { key, permissions })
324 }
325 PublishEntry::Revoke {
326 key_id,
327 permissions,
328 } => entries.push(package::PackageEntry::RevokeFlat {
329 key_id,
330 permissions,
331 }),
332 }
333 }
334
335 let record = package::PackageRecord {
336 prev: self.head,
337 version: PACKAGE_RECORD_VERSION,
338 timestamp: SystemTime::now(),
342 entries,
343 };
344
345 Ok(ProtoEnvelope::signed_contents(signing_key, record)?)
346 }
347}