use anyhow::{Error, Result};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::Stream;
use indexmap::IndexMap;
use reqwest::header::HeaderValue;
use serde::{Deserialize, Serialize};
use std::{fmt, path::PathBuf, pin::Pin, str::FromStr, time::SystemTime};
use warg_crypto::{
hash::{AnyHash, HashAlgorithm},
signing::{self, KeyID, PublicKey},
};
use warg_protocol::{
operator,
package::{self, PackageRecord, Permission, PACKAGE_RECORD_VERSION},
registry::{Checkpoint, PackageName, RecordId, RegistryIndex, TimestampedCheckpoint},
ProtoEnvelope, SerdeEnvelope, Version,
};
mod fs;
pub use fs::*;
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct RegistryDomain(String);
impl RegistryDomain {
pub fn new(registry: String) -> Self {
Self(registry)
}
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}
impl FromStr for RegistryDomain {
type Err = Error;
fn from_str(s: &str) -> std::prelude::v1::Result<Self, Self::Err> {
Ok(RegistryDomain(s.to_string()))
}
}
impl fmt::Display for RegistryDomain {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{registry_domain}", registry_domain = &self.0)
}
}
impl TryFrom<RegistryDomain> for HeaderValue {
type Error = Error;
fn try_from(value: RegistryDomain) -> std::prelude::v1::Result<Self, Self::Error> {
Ok(HeaderValue::from_str(&value.to_string())?)
}
}
#[async_trait]
pub trait RegistryStorage: Send + Sync {
async fn reset(&self, all_registries: bool) -> Result<()>;
async fn load_checkpoint(
&self,
namespace_registry: Option<&RegistryDomain>,
) -> Result<Option<SerdeEnvelope<TimestampedCheckpoint>>>;
async fn store_checkpoint(
&self,
namespace_registry: Option<&RegistryDomain>,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<()>;
async fn load_operator(
&self,
namespace_registry: Option<&RegistryDomain>,
) -> Result<Option<OperatorInfo>>;
async fn store_operator(
&self,
namespace_registry: Option<&RegistryDomain>,
operator: OperatorInfo,
) -> Result<()>;
async fn load_all_packages(&self) -> Result<IndexMap<RegistryDomain, Vec<PackageInfo>>>;
async fn load_package(
&self,
namespace_registry: Option<&RegistryDomain>,
package: &PackageName,
) -> Result<Option<PackageInfo>>;
async fn store_package(
&self,
namespace_registry: Option<&RegistryDomain>,
info: &PackageInfo,
) -> Result<()>;
async fn load_publish(&self) -> Result<Option<PublishInfo>>;
async fn store_publish(&self, info: Option<&PublishInfo>) -> Result<()>;
}
#[async_trait]
pub trait ContentStorage: Send + Sync {
async fn clear(&self) -> Result<()>;
fn content_location(&self, digest: &AnyHash) -> Option<PathBuf>;
async fn load_content(
&self,
digest: &AnyHash,
) -> Result<Option<Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>>>;
async fn store_content(
&self,
stream: Pin<Box<dyn Stream<Item = Result<Bytes>> + Send + Sync>>,
expected_digest: Option<&AnyHash>,
) -> Result<AnyHash>;
}
#[async_trait]
pub trait NamespaceMapStorage: Send + Sync {
async fn load_namespace_map(&self) -> Result<Option<IndexMap<String, String>>>;
async fn reset_namespaces(&self) -> Result<()>;
async fn store_namespace(
&self,
namespace: String,
registry_domain: RegistryDomain,
) -> Result<()>;
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct OperatorInfo {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub registry: Option<RegistryDomain>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<Checkpoint>,
#[serde(default)]
pub state: operator::LogState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub head_registry_index: Option<RegistryIndex>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub head_fetch_token: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PackageInfo {
pub name: PackageName,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub registry: Option<RegistryDomain>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub checkpoint: Option<Checkpoint>,
#[serde(default)]
pub state: package::LogState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub head_registry_index: Option<RegistryIndex>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub head_fetch_token: Option<String>,
}
impl PackageInfo {
pub fn new(name: impl Into<PackageName>) -> Self {
Self {
name: name.into(),
registry: None,
checkpoint: None,
state: package::LogState::default(),
head_registry_index: None,
head_fetch_token: None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum PublishEntry {
Init,
Release {
version: Version,
content: AnyHash,
},
Yank {
version: Version,
},
Grant {
key: PublicKey,
permissions: Vec<Permission>,
},
Revoke {
key_id: KeyID,
permissions: Vec<Permission>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PublishInfo {
pub name: PackageName,
pub head: Option<RecordId>,
pub entries: Vec<PublishEntry>,
}
impl PublishInfo {
pub fn initializing(&self) -> bool {
self.entries.iter().any(|e| matches!(e, PublishEntry::Init))
}
pub(crate) fn finalize(
self,
signing_key: &signing::PrivateKey,
) -> Result<ProtoEnvelope<PackageRecord>> {
let mut entries = Vec::with_capacity(self.entries.len());
for entry in self.entries {
match entry {
PublishEntry::Init => {
entries.push(package::PackageEntry::Init {
hash_algorithm: HashAlgorithm::Sha256,
key: signing_key.public_key(),
});
}
PublishEntry::Release { version, content } => {
entries.push(package::PackageEntry::Release { version, content });
}
PublishEntry::Yank { version } => {
entries.push(package::PackageEntry::Yank { version })
}
PublishEntry::Grant { key, permissions } => {
entries.push(package::PackageEntry::GrantFlat { key, permissions })
}
PublishEntry::Revoke {
key_id,
permissions,
} => entries.push(package::PackageEntry::RevokeFlat {
key_id,
permissions,
}),
}
}
let record = package::PackageRecord {
prev: self.head,
version: PACKAGE_RECORD_VERSION,
timestamp: SystemTime::now(),
entries,
};
Ok(ProtoEnvelope::signed_contents(signing_key, record)?)
}
}