1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
use futures::Stream;
use indexmap::{IndexMap, IndexSet};
use std::pin::Pin;
use thiserror::Error;
use warg_crypto::{
    hash::AnyHash,
    signing::{KeyID, Signature},
};
use warg_protocol::{
    operator, package,
    registry::{
        LogId, LogLeaf, PackageName, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint,
    },
    ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope,
};

mod memory;
#[cfg(feature = "postgres")]
mod postgres;

pub use memory::*;
#[cfg(feature = "postgres")]
pub use postgres::*;

#[derive(Debug, Error)]
pub enum DataStoreError {
    #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")]
    Conflict,

    #[error("checkpoint log length `{0}` was not found")]
    CheckpointNotFound(RegistryLen),

    #[error("log `{0}` was not found")]
    LogNotFound(LogId),

    #[error("record `{0}` was not found")]
    RecordNotFound(RecordId),

    #[error("log leaf {0} was not found")]
    LogLeafNotFound(RegistryIndex),

    #[error("record `{0}` cannot be validated as it is not in a pending state")]
    RecordNotPending(RecordId),

    #[error("contents for record `{record_id}` are invalid: {message}")]
    InvalidRecordContents {
        record_id: RecordId,
        message: String,
    },

    #[error("the operator record was invalid: {0}")]
    OperatorValidationFailed(#[from] operator::ValidationError),

    #[error("the package record was invalid: {0}")]
    PackageValidationFailed(#[from] package::ValidationError),

    #[error("the package `{name}` conflicts with package `{existing}`; package names must be unique in a case insensitive way")]
    PackageNameConflict {
        name: PackageName,
        existing: PackageName,
    },

    #[error("the package namespace `{namespace}` conflicts with existing namespace `{existing}`; package namespaces must be unique in a case insensitive way")]
    PackageNamespaceConflict { namespace: String, existing: String },

    #[error("the package namespace `{0}` is not defined")]
    PackageNamespaceNotDefined(String),

    #[error(
        "the package namespace `{0}` is imported from another registry and cannot accept publishes"
    )]
    PackageNamespaceImported(String),

    #[error("key id `{0}` does not have permission")]
    KeyUnauthorized(KeyID),

    #[error("unknown key id `{0}`")]
    UnknownKey(KeyID),

    #[error("signature `{0}` verification failed")]
    SignatureVerificationFailed(Signature),

    #[error("the record was rejected: {0}")]
    Rejection(String),

    #[cfg(feature = "postgres")]
    #[error("a connection could not be established to the PostgreSQL server: {0}")]
    ConnectionPool(#[from] diesel_async::pooled_connection::deadpool::PoolError),

    #[cfg(feature = "postgres")]
    #[error(transparent)]
    Diesel(#[from] diesel::result::Error),
}

/// Represents the status of a record.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RecordStatus {
    /// The record is pending with missing content.
    MissingContent(Vec<AnyHash>),
    /// The record is pending with all content present.
    Pending,
    /// The record was rejected.
    Rejected(String),
    /// The record has been validated.
    Validated,
    /// The record was published (i.e. included in a registry checkpoint).
    Published,
}

/// Represents a record in a log.
pub struct Record<T>
where
    T: Clone,
{
    /// The status of the record.
    pub status: RecordStatus,
    /// The envelope containing the record contents.
    pub envelope: ProtoEnvelope<T>,
    /// The index of the record in the registry log.
    ///
    /// This is `None` if the record is not published.
    pub registry_index: Option<RegistryIndex>,
}

/// Implemented by data stores.
#[axum::async_trait]
pub trait DataStore: Send + Sync {
    /// Gets a stream of all checkpoints.
    ///
    /// This is an expensive operation and should only be performed on startup.
    async fn get_all_checkpoints(
        &self,
    ) -> Result<
        Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
        DataStoreError,
    >;

    /// Gets a stream of all validated records.
    ///
    /// This is an expensive operation and should only be performed on startup.
    async fn get_all_validated_records(
        &self,
    ) -> Result<Pin<Box<dyn Stream<Item = Result<LogLeaf, DataStoreError>> + Send>>, DataStoreError>;

    /// Looks up the log_id and record_id from the registry log index.  
    async fn get_log_leafs_with_registry_index(
        &self,
        entries: &[RegistryIndex],
    ) -> Result<Vec<LogLeaf>, DataStoreError>;

    /// Stores the given operator record.
    async fn store_operator_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        record: &ProtoEnvelope<operator::OperatorRecord>,
    ) -> Result<(), DataStoreError>;

    /// Rejects the given operator record.
    ///
    /// The record must be in the pending state.
    async fn reject_operator_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        reason: &str,
    ) -> Result<(), DataStoreError>;

    /// Commits the given operator record.
    ///
    /// The record must be in a pending state.
    ///
    /// If validation succeeds, the record will be considered part of the log.
    async fn commit_operator_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        registry_index: RegistryIndex,
    ) -> Result<(), DataStoreError>;

    /// Stores the given package record.
    ///
    /// The `missing` set is the set of content digests that are currently
    /// missing from data storage.
    async fn store_package_record(
        &self,
        log_id: &LogId,
        package_name: &PackageName,
        record_id: &RecordId,
        record: &ProtoEnvelope<package::PackageRecord>,
        missing: &IndexSet<&AnyHash>,
    ) -> Result<(), DataStoreError>;

    /// Rejects the given package record.
    ///
    /// The record must be in the pending state.
    async fn reject_package_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        reason: &str,
    ) -> Result<(), DataStoreError>;

    /// Commits the given package record.
    ///
    /// The record must be in a pending state.
    ///
    /// If validation succeeds, the record will be considered part of the log.
    async fn commit_package_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        registry_index: RegistryIndex,
    ) -> Result<(), DataStoreError>;

    /// Determines if the given content digest is missing for the record.
    ///
    /// The record must be in a pending state.
    async fn is_content_missing(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        digest: &AnyHash,
    ) -> Result<bool, DataStoreError>;

    /// Sets the present flag for the given record and content digest.
    ///
    /// The record must be in a pending state.
    ///
    /// Returns true if the record has all of its content present as a
    /// result of this update.
    ///
    /// Returns false if the given digest was already marked present.
    async fn set_content_present(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
        digest: &AnyHash,
    ) -> Result<bool, DataStoreError>;

    /// Stores a new checkpoint.
    async fn store_checkpoint(
        &self,
        checkpoint_id: &AnyHash,
        ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
    ) -> Result<(), DataStoreError>;

    /// Gets the latest checkpoint.
    async fn get_latest_checkpoint(
        &self,
    ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;

    /// Get checkpoint by log length.
    async fn get_checkpoint(
        &self,
        log_length: RegistryLen,
    ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;

    /// Gets package names from log IDs. If package name is unavailable, a corresponding `None` is returned.
    async fn get_package_names(
        &self,
        log_ids: &[LogId],
    ) -> Result<IndexMap<LogId, Option<PackageName>>, DataStoreError>;

    /// Gets a batch of log leafs starting with a registry log index.  
    async fn get_log_leafs_starting_with_registry_index(
        &self,
        starting_index: RegistryIndex,
        limit: usize,
    ) -> Result<Vec<(RegistryIndex, LogLeaf)>, DataStoreError>;

    /// Gets the operator records for the given registry log length.
    async fn get_operator_records(
        &self,
        log_id: &LogId,
        registry_log_length: RegistryLen,
        since: Option<&RecordId>,
        limit: u16,
    ) -> Result<Vec<PublishedProtoEnvelope<operator::OperatorRecord>>, DataStoreError>;

    /// Gets the package records for the given registry log length.
    async fn get_package_records(
        &self,
        log_id: &LogId,
        registry_log_length: RegistryLen,
        since: Option<&RecordId>,
        limit: u16,
    ) -> Result<Vec<PublishedProtoEnvelope<package::PackageRecord>>, DataStoreError>;

    /// Gets an operator record.
    async fn get_operator_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
    ) -> Result<Record<operator::OperatorRecord>, DataStoreError>;

    /// Gets a package record.
    async fn get_package_record(
        &self,
        log_id: &LogId,
        record_id: &RecordId,
    ) -> Result<Record<package::PackageRecord>, DataStoreError>;

    /// Verifies the signature of a package record.
    ///
    /// This is different from `validate_package_record` in that
    /// only the signature on the envelope is verified.
    ///
    /// It does not attempt to validate the record itself.
    async fn verify_package_record_signature(
        &self,
        log_id: &LogId,
        record: &ProtoEnvelope<package::PackageRecord>,
    ) -> Result<(), DataStoreError>;

    /// Verifies the package name is unique in a case insensitive way and that the
    /// package namespace is defined for this registry and is not imported
    /// from another registry.
    async fn verify_can_publish_package(
        &self,
        operator_log_id: &LogId,
        package_name: &PackageName,
    ) -> Result<(), DataStoreError>;

    /// Verifies the TimestampedCheckpoint signature.
    async fn verify_timestamped_checkpoint_signature(
        &self,
        operator_log_id: &LogId,
        ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
    ) -> Result<(), DataStoreError>;

    // Returns a list of package names, for debugging only.
    #[cfg(feature = "debug")]
    #[doc(hidden)]
    async fn debug_list_package_names(&self) -> anyhow::Result<Vec<PackageName>> {
        anyhow::bail!("not implemented")
    }
}