1use futures::Stream;
2use indexmap::{IndexMap, IndexSet};
3use std::pin::Pin;
4use thiserror::Error;
5use warg_crypto::{
6 hash::AnyHash,
7 signing::{KeyID, Signature},
8};
9use warg_protocol::{
10 operator, package,
11 registry::{
12 LogId, LogLeaf, PackageName, RecordId, RegistryIndex, RegistryLen, TimestampedCheckpoint,
13 },
14 ProtoEnvelope, PublishedProtoEnvelope, SerdeEnvelope,
15};
16
17mod memory;
18#[cfg(feature = "postgres")]
19mod postgres;
20
21pub use memory::*;
22#[cfg(feature = "postgres")]
23pub use postgres::*;
24
25#[derive(Debug, Error)]
26pub enum DataStoreError {
27 #[error("a conflicting operation was processed: update to the latest checkpoint and try the operation again")]
28 Conflict,
29
30 #[error("checkpoint log length `{0}` was not found")]
31 CheckpointNotFound(RegistryLen),
32
33 #[error("log `{0}` was not found")]
34 LogNotFound(LogId),
35
36 #[error("record `{0}` was not found")]
37 RecordNotFound(RecordId),
38
39 #[error("log leaf {0} was not found")]
40 LogLeafNotFound(RegistryIndex),
41
42 #[error("record `{0}` cannot be validated as it is not in a pending state")]
43 RecordNotPending(RecordId),
44
45 #[error("contents for record `{record_id}` are invalid: {message}")]
46 InvalidRecordContents {
47 record_id: RecordId,
48 message: String,
49 },
50
51 #[error("the operator record was invalid: {0}")]
52 OperatorValidationFailed(#[from] operator::ValidationError),
53
54 #[error("the package record was invalid: {0}")]
55 PackageValidationFailed(#[from] package::ValidationError),
56
57 #[error("the package namespace `{0}` is not defined")]
58 PackageNamespaceNotDefined(String),
59
60 #[error(
61 "the package namespace `{0}` is imported from another registry and cannot accept publishes"
62 )]
63 PackageNamespaceImported(String),
64
65 #[error("key id `{0}` does not have permission")]
66 KeyUnauthorized(KeyID),
67
68 #[error("unknown key id `{0}`")]
69 UnknownKey(KeyID),
70
71 #[error("signature `{0}` verification failed")]
72 SignatureVerificationFailed(Signature),
73
74 #[error("the record was rejected: {0}")]
75 Rejection(String),
76
77 #[cfg(feature = "postgres")]
78 #[error("a connection could not be established to the PostgreSQL server: {0}")]
79 ConnectionPool(#[from] diesel_async::pooled_connection::deadpool::PoolError),
80
81 #[cfg(feature = "postgres")]
82 #[error(transparent)]
83 Diesel(#[from] diesel::result::Error),
84}
85
86#[derive(Debug, Clone, Eq, PartialEq)]
88pub enum RecordStatus {
89 MissingContent(Vec<AnyHash>),
91 Pending,
93 Rejected(String),
95 Validated,
97 Published,
99}
100
101pub struct Record<T>
103where
104 T: Clone,
105{
106 pub status: RecordStatus,
108 pub envelope: ProtoEnvelope<T>,
110 pub registry_index: Option<RegistryIndex>,
114}
115
116#[axum::async_trait]
118pub trait DataStore: Send + Sync {
119 async fn get_all_checkpoints(
123 &self,
124 ) -> Result<
125 Pin<Box<dyn Stream<Item = Result<TimestampedCheckpoint, DataStoreError>> + Send>>,
126 DataStoreError,
127 >;
128
129 async fn get_all_validated_records(
133 &self,
134 ) -> Result<Pin<Box<dyn Stream<Item = Result<LogLeaf, DataStoreError>> + Send>>, DataStoreError>;
135
136 async fn get_log_leafs_with_registry_index(
138 &self,
139 entries: &[RegistryIndex],
140 ) -> Result<Vec<LogLeaf>, DataStoreError>;
141
142 async fn store_operator_record(
144 &self,
145 log_id: &LogId,
146 record_id: &RecordId,
147 record: &ProtoEnvelope<operator::OperatorRecord>,
148 ) -> Result<(), DataStoreError>;
149
150 async fn reject_operator_record(
154 &self,
155 log_id: &LogId,
156 record_id: &RecordId,
157 reason: &str,
158 ) -> Result<(), DataStoreError>;
159
160 async fn commit_operator_record(
166 &self,
167 log_id: &LogId,
168 record_id: &RecordId,
169 registry_index: RegistryIndex,
170 ) -> Result<(), DataStoreError>;
171
172 async fn store_package_record(
177 &self,
178 log_id: &LogId,
179 package_name: &PackageName,
180 record_id: &RecordId,
181 record: &ProtoEnvelope<package::PackageRecord>,
182 missing: &IndexSet<&AnyHash>,
183 ) -> Result<(), DataStoreError>;
184
185 async fn reject_package_record(
189 &self,
190 log_id: &LogId,
191 record_id: &RecordId,
192 reason: &str,
193 ) -> Result<(), DataStoreError>;
194
195 async fn commit_package_record(
201 &self,
202 log_id: &LogId,
203 record_id: &RecordId,
204 registry_index: RegistryIndex,
205 ) -> Result<(), DataStoreError>;
206
207 async fn is_content_missing(
211 &self,
212 log_id: &LogId,
213 record_id: &RecordId,
214 digest: &AnyHash,
215 ) -> Result<bool, DataStoreError>;
216
217 async fn set_content_present(
226 &self,
227 log_id: &LogId,
228 record_id: &RecordId,
229 digest: &AnyHash,
230 ) -> Result<bool, DataStoreError>;
231
232 async fn store_checkpoint(
234 &self,
235 checkpoint_id: &AnyHash,
236 ts_checkpoint: SerdeEnvelope<TimestampedCheckpoint>,
237 ) -> Result<(), DataStoreError>;
238
239 async fn get_latest_checkpoint(
241 &self,
242 ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
243
244 async fn get_checkpoint(
246 &self,
247 log_length: RegistryLen,
248 ) -> Result<SerdeEnvelope<TimestampedCheckpoint>, DataStoreError>;
249
250 async fn get_package_names(
252 &self,
253 log_ids: &[LogId],
254 ) -> Result<IndexMap<LogId, Option<PackageName>>, DataStoreError>;
255
256 async fn get_log_leafs_starting_with_registry_index(
258 &self,
259 starting_index: RegistryIndex,
260 limit: usize,
261 ) -> Result<Vec<(RegistryIndex, LogLeaf)>, DataStoreError>;
262
263 async fn get_operator_records(
265 &self,
266 log_id: &LogId,
267 registry_log_length: RegistryLen,
268 since: Option<&RecordId>,
269 limit: u16,
270 ) -> Result<Vec<PublishedProtoEnvelope<operator::OperatorRecord>>, DataStoreError>;
271
272 async fn get_package_records(
274 &self,
275 log_id: &LogId,
276 registry_log_length: RegistryLen,
277 since: Option<&RecordId>,
278 limit: u16,
279 ) -> Result<Vec<PublishedProtoEnvelope<package::PackageRecord>>, DataStoreError>;
280
281 async fn get_operator_record(
283 &self,
284 log_id: &LogId,
285 record_id: &RecordId,
286 ) -> Result<Record<operator::OperatorRecord>, DataStoreError>;
287
288 async fn get_package_record(
290 &self,
291 log_id: &LogId,
292 record_id: &RecordId,
293 ) -> Result<Record<package::PackageRecord>, DataStoreError>;
294
295 async fn verify_package_record_signature(
302 &self,
303 log_id: &LogId,
304 record: &ProtoEnvelope<package::PackageRecord>,
305 ) -> Result<(), DataStoreError>;
306
307 async fn verify_can_publish_package(
311 &self,
312 operator_log_id: &LogId,
313 package_name: &PackageName,
314 ) -> Result<(), DataStoreError>;
315
316 async fn verify_timestamped_checkpoint_signature(
318 &self,
319 operator_log_id: &LogId,
320 ts_checkpoint: &SerdeEnvelope<TimestampedCheckpoint>,
321 ) -> Result<(), DataStoreError>;
322
323 #[cfg(feature = "debug")]
325 #[doc(hidden)]
326 async fn debug_list_package_names(&self) -> anyhow::Result<Vec<PackageName>> {
327 anyhow::bail!("not implemented")
328 }
329}