use futures::Stream;
use indexmap::{IndexMap, IndexSet};
use std::pin::Pin;
use thiserror::Error;
use warg_crypto::{
hash::AnyHash,
signing::{KeyID, Signature},
};
use warg_protocol::{
operator, package,
registry::{
LogId, LogLeaf, PackageName, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint,
},
ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope,
};
mod memory;
#[cfg(feature = "postgres")]
mod postgres;
pub use memory::*;
#[cfg(feature = "postgres")]
pub use postgres::*;
#[derive(Debug, Error)]
pub enum DataStoreError {
#[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")]
Conflict,
#[error("checkpoint log length `{0}` was not found")]
CheckpointNotFound(RegistryLen),
#[error("log `{0}` was not found")]
LogNotFound(LogId),
#[error("record `{0}` was not found")]
RecordNotFound(RecordId),
#[error("log leaf {0} was not found")]
LogLeafNotFound(RegistryIndex),
#[error("record `{0}` cannot be validated as it is not in a pending state")]
RecordNotPending(RecordId),
#[error("contents for record `{record_id}` are invalid: {message}")]
InvalidRecordContents {
record_id: RecordId,
message: String,
},
#[error("the operator record was invalid: {0}")]
OperatorValidationFailed(#[from] operator::ValidationError),
#[error("the package record was invalid: {0}")]
PackageValidationFailed(#[from] package::ValidationError),
#[error("the package `{name}` conflicts with package `{existing}`; package names must be unique in a case insensitive way")]
PackageNameConflict {
name: PackageName,
existing: PackageName,
},
#[error("the package namespace `{namespace}` conflicts with existing namespace `{existing}`; package namespaces must be unique in a case insensitive way")]
PackageNamespaceConflict { namespace: String, existing: String },
#[error("the package namespace `{0}` is not defined")]
PackageNamespaceNotDefined(String),
#[error(
"the package namespace `{0}` is imported from another registry and cannot accept publishes"
)]
PackageNamespaceImported(String),
#[error("key id `{0}` does not have permission")]
KeyUnauthorized(KeyID),
#[error("unknown key id `{0}`")]
UnknownKey(KeyID),
#[error("signature `{0}` verification failed")]
SignatureVerificationFailed(Signature),
#[error("the record was rejected: {0}")]
Rejection(String),
#[cfg(feature = "postgres")]
#[error("a connection could not be established to the PostgreSQL server: {0}")]
ConnectionPool(#[from] diesel_async::pooled_connection::deadpool::PoolError),
#[cfg(feature = "postgres")]
#[error(transparent)]
Diesel(#[from] diesel::result::Error),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RecordStatus {
MissingContent(Vec<AnyHash>),
Pending,
Rejected(String),
Validated,
Published,
}
pub struct Record<T>
where
T: Clone,
{
pub status: RecordStatus,
pub envelope: ProtoEnvelope<T>,
pub registry_index: Option<RegistryIndex>,
}
#[axum::async_trait]
pub trait DataStore: Send + Sync {
async fn get_all_checkpoints(
&self,
) -> Result<
Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
DataStoreError,
>;
async fn get_all_validated_records(
&self,
) -> Result<Pin<Box<dyn Stream<Item = Result<LogLeaf, DataStoreError>> + Send>>, DataStoreError>;
async fn get_log_leafs_with_registry_index(
&self,
entries: &[RegistryIndex],
) -> Result<Vec<LogLeaf>, DataStoreError>;
async fn store_operator_record(
&self,
log_id: &LogId,
record_id: &RecordId,
record: &ProtoEnvelope<operator::OperatorRecord>,
) -> Result<(), DataStoreError>;
async fn reject_operator_record(
&self,
log_id: &LogId,
record_id: &RecordId,
reason: &str,
) -> Result<(), DataStoreError>;
async fn commit_operator_record(
&self,
log_id: &LogId,
record_id: &RecordId,
registry_index: RegistryIndex,
) -> Result<(), DataStoreError>;
async fn store_package_record(
&self,
log_id: &LogId,
package_name: &PackageName,
record_id: &RecordId,
record: &ProtoEnvelope<package::PackageRecord>,
missing: &IndexSet<&AnyHash>,
) -> Result<(), DataStoreError>;
async fn reject_package_record(
&self,
log_id: &LogId,
record_id: &RecordId,
reason: &str,
) -> Result<(), DataStoreError>;
async fn commit_package_record(
&self,
log_id: &LogId,
record_id: &RecordId,
registry_index: RegistryIndex,
) -> Result<(), DataStoreError>;
async fn is_content_missing(
&self,
log_id: &LogId,
record_id: &RecordId,
digest: &AnyHash,
) -> Result<bool, DataStoreError>;
async fn set_content_present(
&self,
log_id: &LogId,
record_id: &RecordId,
digest: &AnyHash,
) -> Result<bool, DataStoreError>;
async fn store_checkpoint(
&self,
checkpoint_id: &AnyHash,
ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<(), DataStoreError>;
async fn get_latest_checkpoint(
&self,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
async fn get_checkpoint(
&self,
log_length: RegistryLen,
) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
async fn get_package_names(
&self,
log_ids: &[LogId],
) -> Result<IndexMap<LogId, Option<PackageName>>, DataStoreError>;
async fn get_log_leafs_starting_with_registry_index(
&self,
starting_index: RegistryIndex,
limit: usize,
) -> Result<Vec<(RegistryIndex, LogLeaf)>, DataStoreError>;
async fn get_operator_records(
&self,
log_id: &LogId,
registry_log_length: RegistryLen,
since: Option<&RecordId>,
limit: u16,
) -> Result<Vec<PublishedProtoEnvelope<operator::OperatorRecord>>, DataStoreError>;
async fn get_package_records(
&self,
log_id: &LogId,
registry_log_length: RegistryLen,
since: Option<&RecordId>,
limit: u16,
) -> Result<Vec<PublishedProtoEnvelope<package::PackageRecord>>, DataStoreError>;
async fn get_operator_record(
&self,
log_id: &LogId,
record_id: &RecordId,
) -> Result<Record<operator::OperatorRecord>, DataStoreError>;
async fn get_package_record(
&self,
log_id: &LogId,
record_id: &RecordId,
) -> Result<Record<package::PackageRecord>, DataStoreError>;
async fn verify_package_record_signature(
&self,
log_id: &LogId,
record: &ProtoEnvelope<package::PackageRecord>,
) -> Result<(), DataStoreError>;
async fn verify_can_publish_package(
&self,
operator_log_id: &LogId,
package_name: &PackageName,
) -> Result<(), DataStoreError>;
async fn verify_timestamped_checkpoint_signature(
&self,
operator_log_id: &LogId,
ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
) -> Result<(), DataStoreError>;
#[cfg(feature = "debug")]
#[doc(hidden)]
async fn debug_list_package_names(&self) -> anyhow::Result<Vec<PackageName>> {
anyhow::bail!("not implemented")
}
}