warg_server/datastore/
mod.rs

1use futures::Stream;
2use indexmap::{IndexMap, IndexSet};
3use std::pin::Pin;
4use thiserror::Error;
5use warg_crypto::{
6    hash::AnyHash,
7    signing::{KeyID, Signature},
8};
9use warg_protocol::{
10    operator, package,
11    registry::{
12        LogId, LogLeaf, PackageName, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint,
13    },
14    ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope,
15};
16
17mod memory;
18#[cfg(feature = "postgres")]
19mod postgres;
20
21pub use memory::*;
22#[cfg(feature = "postgres")]
23pub use postgres::*;
24
25#[derive(Debug, Error)]
26pub enum DataStoreError {
27    #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")]
28    Conflict,
29
30    #[error("checkpoint log length `{0}` was not found")]
31    CheckpointNotFound(RegistryLen),
32
33    #[error("log `{0}` was not found")]
34    LogNotFound(LogId),
35
36    #[error("record `{0}` was not found")]
37    RecordNotFound(RecordId),
38
39    #[error("log leaf {0} was not found")]
40    LogLeafNotFound(RegistryIndex),
41
42    #[error("record `{0}` cannot be validated as it is not in a pending state")]
43    RecordNotPending(RecordId),
44
45    #[error("contents for record `{record_id}` are invalid: {message}")]
46    InvalidRecordContents {
47        record_id: RecordId,
48        message: String,
49    },
50
51    #[error("the operator record was invalid: {0}")]
52    OperatorValidationFailed(#[from] operator::ValidationError),
53
54    #[error("the package record was invalid: {0}")]
55    PackageValidationFailed(#[from] package::ValidationError),
56
57    #[error("the package namespace `{0}` is not defined")]
58    PackageNamespaceNotDefined(String),
59
60    #[error(
61        "the package namespace `{0}` is imported from another registry and cannot accept publishes"
62    )]
63    PackageNamespaceImported(String),
64
65    #[error("key id `{0}` does not have permission")]
66    KeyUnauthorized(KeyID),
67
68    #[error("unknown key id `{0}`")]
69    UnknownKey(KeyID),
70
71    #[error("signature `{0}` verification failed")]
72    SignatureVerificationFailed(Signature),
73
74    #[error("the record was rejected: {0}")]
75    Rejection(String),
76
77    #[cfg(feature = "postgres")]
78    #[error("a connection could not be established to the PostgreSQL server: {0}")]
79    ConnectionPool(#[from] diesel_async::pooled_connection::deadpool::PoolError),
80
81    #[cfg(feature = "postgres")]
82    #[error(transparent)]
83    Diesel(#[from] diesel::result::Error),
84}
85
86/// Represents the status of a record.
87#[derive(Debug, Clone, Eq, PartialEq)]
88pub enum RecordStatus {
89    /// The record is pending with missing content.
90    MissingContent(Vec<AnyHash>),
91    /// The record is pending with all content present.
92    Pending,
93    /// The record was rejected.
94    Rejected(String),
95    /// The record has been validated.
96    Validated,
97    /// The record was published (i.e. included in a registry checkpoint).
98    Published,
99}
100
101/// Represents a record in a log.
102pub struct Record<T>
103where
104    T: Clone,
105{
106    /// The status of the record.
107    pub status: RecordStatus,
108    /// The envelope containing the record contents.
109    pub envelope: ProtoEnvelope<T>,
110    /// The index of the record in the registry log.
111    ///
112    /// This is `None` if the record is not published.
113    pub registry_index: Option<RegistryIndex>,
114}
115
116/// Implemented by data stores.
117#[axum::async_trait]
118pub trait DataStore: Send + Sync {
119    /// Gets a stream of all checkpoints.
120    ///
121    /// This is an expensive operation and should only be performed on startup.
122    async fn get_all_checkpoints(
123        &self,
124    ) -> Result<
125        Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
126        DataStoreError,
127    >;
128
129    /// Gets a stream of all validated records.
130    ///
131    /// This is an expensive operation and should only be performed on startup.
132    async fn get_all_validated_records(
133        &self,
134    ) -> Result<Pin<Box<dyn Stream<Item = Result<LogLeaf, DataStoreError>> + Send>>, DataStoreError>;
135
136    /// Looks up the log_id and record_id from the registry log index.  
137    async fn get_log_leafs_with_registry_index(
138        &self,
139        entries: &[RegistryIndex],
140    ) -> Result<Vec<LogLeaf>, DataStoreError>;
141
142    /// Stores the given operator record.
143    async fn store_operator_record(
144        &self,
145        log_id: &LogId,
146        record_id: &RecordId,
147        record: &ProtoEnvelope<operator::OperatorRecord>,
148    ) -> Result<(), DataStoreError>;
149
150    /// Rejects the given operator record.
151    ///
152    /// The record must be in the pending state.
153    async fn reject_operator_record(
154        &self,
155        log_id: &LogId,
156        record_id: &RecordId,
157        reason: &str,
158    ) -> Result<(), DataStoreError>;
159
160    /// Commits the given operator record.
161    ///
162    /// The record must be in a pending state.
163    ///
164    /// If validation succeeds, the record will be considered part of the log.
165    async fn commit_operator_record(
166        &self,
167        log_id: &LogId,
168        record_id: &RecordId,
169        registry_index: RegistryIndex,
170    ) -> Result<(), DataStoreError>;
171
172    /// Stores the given package record.
173    ///
174    /// The `missing` set is the set of content digests that are currently
175    /// missing from data storage.
176    async fn store_package_record(
177        &self,
178        log_id: &LogId,
179        package_name: &PackageName,
180        record_id: &RecordId,
181        record: &ProtoEnvelope<package::PackageRecord>,
182        missing: &IndexSet<&AnyHash>,
183    ) -> Result<(), DataStoreError>;
184
185    /// Rejects the given package record.
186    ///
187    /// The record must be in the pending state.
188    async fn reject_package_record(
189        &self,
190        log_id: &LogId,
191        record_id: &RecordId,
192        reason: &str,
193    ) -> Result<(), DataStoreError>;
194
195    /// Commits the given package record.
196    ///
197    /// The record must be in a pending state.
198    ///
199    /// If validation succeeds, the record will be considered part of the log.
200    async fn commit_package_record(
201        &self,
202        log_id: &LogId,
203        record_id: &RecordId,
204        registry_index: RegistryIndex,
205    ) -> Result<(), DataStoreError>;
206
207    /// Determines if the given content digest is missing for the record.
208    ///
209    /// The record must be in a pending state.
210    async fn is_content_missing(
211        &self,
212        log_id: &LogId,
213        record_id: &RecordId,
214        digest: &AnyHash,
215    ) -> Result<bool, DataStoreError>;
216
217    /// Sets the present flag for the given record and content digest.
218    ///
219    /// The record must be in a pending state.
220    ///
221    /// Returns true if the record has all of its content present as a
222    /// result of this update.
223    ///
224    /// Returns false if the given digest was already marked present.
225    async fn set_content_present(
226        &self,
227        log_id: &LogId,
228        record_id: &RecordId,
229        digest: &AnyHash,
230    ) -> Result<bool, DataStoreError>;
231
232    /// Stores a new checkpoint.
233    async fn store_checkpoint(
234        &self,
235        checkpoint_id: &AnyHash,
236        ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
237    ) -> Result<(), DataStoreError>;
238
239    /// Gets the latest checkpoint.
240    async fn get_latest_checkpoint(
241        &self,
242    ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
243
244    /// Get checkpoint by log length.
245    async fn get_checkpoint(
246        &self,
247        log_length: RegistryLen,
248    ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
249
250    /// Gets package names from log IDs. If package name is unavailable, a corresponding `None` is returned.
251    async fn get_package_names(
252        &self,
253        log_ids: &[LogId],
254    ) -> Result<IndexMap<LogId, Option<PackageName>>, DataStoreError>;
255
256    /// Gets a batch of log leafs starting with a registry log index.  
257    async fn get_log_leafs_starting_with_registry_index(
258        &self,
259        starting_index: RegistryIndex,
260        limit: usize,
261    ) -> Result<Vec<(RegistryIndex, LogLeaf)>, DataStoreError>;
262
263    /// Gets the operator records for the given registry log length.
264    async fn get_operator_records(
265        &self,
266        log_id: &LogId,
267        registry_log_length: RegistryLen,
268        since: Option<&RecordId>,
269        limit: u16,
270    ) -> Result<Vec<PublishedProtoEnvelope<operator::OperatorRecord>>, DataStoreError>;
271
272    /// Gets the package records for the given registry log length.
273    async fn get_package_records(
274        &self,
275        log_id: &LogId,
276        registry_log_length: RegistryLen,
277        since: Option<&RecordId>,
278        limit: u16,
279    ) -> Result<Vec<PublishedProtoEnvelope<package::PackageRecord>>, DataStoreError>;
280
281    /// Gets an operator record.
282    async fn get_operator_record(
283        &self,
284        log_id: &LogId,
285        record_id: &RecordId,
286    ) -> Result<Record<operator::OperatorRecord>, DataStoreError>;
287
288    /// Gets a package record.
289    async fn get_package_record(
290        &self,
291        log_id: &LogId,
292        record_id: &RecordId,
293    ) -> Result<Record<package::PackageRecord>, DataStoreError>;
294
295    /// Verifies the signature of a package record.
296    ///
297    /// This is different from `validate_package_record` in that
298    /// only the signature on the envelope is verified.
299    ///
300    /// It does not attempt to validate the record itself.
301    async fn verify_package_record_signature(
302        &self,
303        log_id: &LogId,
304        record: &ProtoEnvelope<package::PackageRecord>,
305    ) -> Result<(), DataStoreError>;
306
307    /// Verifies the package name is unique in a case insensitive way and that the
308    /// package namespace is defined for this registry and is not imported
309    /// from another registry.
310    async fn verify_can_publish_package(
311        &self,
312        operator_log_id: &LogId,
313        package_name: &PackageName,
314    ) -> Result<(), DataStoreError>;
315
316    /// Verifies the TimestampedCheckpoint signature.
317    async fn verify_timestamped_checkpoint_signature(
318        &self,
319        operator_log_id: &LogId,
320        ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
321    ) -> Result<(), DataStoreError>;
322
323    // Returns a list of package names, for debugging only.
324    #[cfg(feature = "debug")]
325    #[doc(hidden)]
326    async fn debug_list_package_names(&self) -> anyhow::Result<Vec<PackageName>> {
327        anyhow::bail!("not implemented")
328    }
329}