1use std::{
2 sync::Arc,
3 time::{Duration, SystemTime},
4};
5
6use futures::{pin_mut, StreamExt};
7use indexmap::IndexMap;
8use thiserror::Error;
9use tokio::{
10 sync::{mpsc, RwLock},
11 task::JoinHandle,
12 time::MissedTickBehavior,
13};
14use warg_crypto::{
15 hash::{AnyHash, Hash, Sha256, SupportedDigest},
16 signing::PrivateKey,
17};
18use warg_protocol::{
19 operator,
20 registry::{
21 Checkpoint, LogId, LogLeaf, MapLeaf, RecordId, RegistryIndex, RegistryLen,
22 TimestampedCheckpoint,
23 },
24 ProtoEnvelope, SerdeEnvelope,
25};
26use warg_transparency::{
27 log::{LogBuilder, LogData, LogProofBundle, Node, VecLog},
28 map::{Map, MapProofBundle},
29};
30
31use crate::datastore::{DataStore, DataStoreError};
32
33#[derive(Clone)]
34pub struct CoreService<Digest: SupportedDigest = Sha256> {
35 inner: Arc<Inner<Digest>>,
36
37 submit_entry_tx: mpsc::Sender<LogLeaf>,
39}
40
41impl<Digest: SupportedDigest> CoreService<Digest> {
42 pub async fn start(
46 operator_key: PrivateKey,
47 namespaces: Option<Vec<(String, operator::NamespaceState)>>,
48 store: Box<dyn DataStore>,
49 checkpoint_interval: Duration,
50 ) -> Result<(Self, JoinHandle<()>), CoreServiceError> {
51 let mut inner = Inner {
53 operator_key,
54 store,
55 state: Default::default(),
56 };
57 inner.initialize(namespaces).await?;
58
59 let inner = Arc::new(inner);
61 let (submit_entry_tx, submit_entry_rx) = tokio::sync::mpsc::channel(4);
62 let handle = tokio::spawn(
63 inner
64 .clone()
65 .process_state_updates(submit_entry_rx, checkpoint_interval),
66 );
67
68 let svc = Self {
69 inner,
70 submit_entry_tx,
71 };
72 Ok((svc, handle))
73 }
74
75 pub async fn log_consistency_proof(
77 &self,
78 from_log_length: RegistryLen,
79 to_log_length: RegistryLen,
80 ) -> Result<LogProofBundle<Digest, LogLeaf>, CoreServiceError> {
81 let state = self.inner.state.read().await;
82
83 let proof = state.log.prove_consistency(from_log_length, to_log_length);
84 LogProofBundle::bundle(vec![proof], vec![], &state.log)
85 .map_err(CoreServiceError::BundleFailure)
86 }
87
88 pub async fn log_inclusion_proofs(
90 &self,
91 log_length: RegistryLen,
92 entries: &[RegistryIndex],
93 ) -> Result<LogProofBundle<Digest, LogLeaf>, CoreServiceError> {
94 let state = self.inner.state.read().await;
95
96 let proofs = entries
97 .iter()
98 .map(|&index| {
99 let node = if index < state.leaf_index.len() as RegistryIndex {
100 state.leaf_index[index]
101 } else {
102 return Err(CoreServiceError::LeafNotFound(index));
103 };
104 Ok(state.log.prove_inclusion(node, log_length))
105 })
106 .collect::<Result<Vec<_>, CoreServiceError>>()?;
107
108 LogProofBundle::bundle(vec![], proofs, &state.log).map_err(CoreServiceError::BundleFailure)
109 }
110
111 pub async fn map_inclusion_proofs(
113 &self,
114 log_length: RegistryLen,
115 entries: &[RegistryIndex],
116 ) -> Result<MapProofBundle<Digest, LogId, MapLeaf>, CoreServiceError> {
117 let state = self.inner.state.read().await;
118
119 let (map_root, map) = state
120 .map_index
121 .get(&log_length)
122 .ok_or_else(|| CoreServiceError::CheckpointNotFound(log_length))?;
123
124 let indexes = self
125 .inner
126 .store
127 .get_log_leafs_with_registry_index(entries)
128 .await
129 .map_err(CoreServiceError::DataStore)?;
130
131 let proofs = indexes
132 .iter()
133 .map(|log_leaf| {
134 let LogLeaf { log_id, record_id } = log_leaf;
135
136 let proof = map
137 .prove(log_id.clone())
138 .ok_or_else(|| CoreServiceError::PackageNotIncluded(log_id.clone()))?;
139
140 let map_leaf = MapLeaf {
141 record_id: record_id.clone(),
142 };
143 let found_root = proof.evaluate(log_id, &map_leaf);
144 if &found_root != map_root {
145 return Err(CoreServiceError::IncorrectProof {
146 root: map_root.into(),
147 found: found_root.into(),
148 });
149 }
150
151 Ok(proof)
152 })
153 .collect::<Result<Vec<_>, CoreServiceError>>()?;
154
155 Ok(MapProofBundle::bundle(proofs))
156 }
157
158 pub fn store(&self) -> &dyn DataStore {
160 self.inner.store.as_ref()
161 }
162
163 pub async fn submit_package_record(&self, log_id: LogId, record_id: RecordId) {
165 self.submit_entry_tx
166 .send(LogLeaf { log_id, record_id })
167 .await
168 .unwrap()
169 }
170}
171
172struct Inner<Digest: SupportedDigest> {
173 operator_key: PrivateKey,
175
176 store: Box<dyn DataStore>,
178
179 state: RwLock<State<Digest>>,
181}
182
183impl<Digest: SupportedDigest> Inner<Digest> {
184 async fn initialize(
187 &mut self,
188 namespaces: Option<Vec<(String, operator::NamespaceState)>>,
189 ) -> Result<(), CoreServiceError> {
190 tracing::debug!("Initializing CoreService");
191
192 let published = self.store.get_all_validated_records().await?.peekable();
193 pin_mut!(published);
194
195 if published.as_mut().peek().await.is_none() {
197 tracing::debug!("No existing records; initializing new state");
198 return self.initialize_new(namespaces).await;
199 }
200
201 let mut checkpoints = self.store.get_all_checkpoints().await?;
203 let mut checkpoints_by_len: IndexMap<RegistryLen, Checkpoint> = Default::default();
204 while let Some(checkpoint) = checkpoints.next().await {
205 let checkpoint = checkpoint?.checkpoint;
206 checkpoints_by_len.insert(checkpoint.log_length, checkpoint);
207 }
208
209 let state = self.state.get_mut();
210 while let Some(entry) = published.next().await {
211 state.push_entry(entry?);
212 if let Some(stored_checkpoint) =
213 checkpoints_by_len.get(&(state.log.length() as RegistryLen))
214 {
215 let computed_checkpoint = state.checkpoint();
217 assert!(stored_checkpoint == &computed_checkpoint);
218 }
219 }
220
221 Ok(())
222 }
223
224 async fn initialize_new(
225 &mut self,
226 namespaces: Option<Vec<(String, operator::NamespaceState)>>,
227 ) -> Result<(), CoreServiceError> {
228 let state = self.state.get_mut();
229
230 let init = operator::OperatorEntry::Init {
232 hash_algorithm: Digest::ALGORITHM,
233 key: self.operator_key.public_key(),
234 };
235 let entries = if let Some(namespaces) = namespaces {
236 let mut entries = Vec::with_capacity(1 + namespaces.len());
237 entries.push(init);
238 for (namespace, state) in namespaces.into_iter() {
239 entries.push(match state {
240 operator::NamespaceState::Defined => {
241 operator::OperatorEntry::DefineNamespace { namespace }
242 }
243 operator::NamespaceState::Imported { registry } => {
244 operator::OperatorEntry::ImportNamespace {
245 namespace,
246 registry,
247 }
248 }
249 });
250 }
251 entries
252 } else {
253 vec![init]
254 };
255
256 let init_record = operator::OperatorRecord {
257 prev: None,
258 version: 0,
259 timestamp: SystemTime::now(),
260 entries,
261 };
262 let signed_init_record =
263 ProtoEnvelope::signed_contents(&self.operator_key, init_record).unwrap();
264 let log_id = LogId::operator_log::<Digest>();
265 let record_id = RecordId::operator_record::<Digest>(&signed_init_record);
266
267 self.store
269 .store_operator_record(&log_id, &record_id, &signed_init_record)
270 .await?;
271 self.store
272 .commit_operator_record(&log_id, &record_id, 0)
273 .await?;
274
275 state.push_entry(LogLeaf { log_id, record_id });
277
278 let mut checkpoint = Checkpoint {
280 log_root: Hash::<Digest>::default().into(),
281 log_length: 0,
282 map_root: Hash::<Digest>::default().into(),
283 };
284 self.update_checkpoint(&mut checkpoint).await;
285
286 Ok(())
287 }
288
289 async fn process_state_updates(
291 self: Arc<Self>,
292 mut submit_entry_rx: mpsc::Receiver<LogLeaf>,
293 checkpoint_interval: Duration,
294 ) {
295 let mut checkpoint = self
296 .store
297 .get_latest_checkpoint()
298 .await
299 .unwrap()
300 .into_contents()
301 .checkpoint;
302
303 let mut checkpoint_interval = tokio::time::interval(checkpoint_interval);
304 checkpoint_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
305
306 loop {
307 tokio::select! {
308 entry = submit_entry_rx.recv() => match entry {
309 Some(entry) => self.process_package_entry(&entry).await,
310 None => break, },
312 _ = checkpoint_interval.tick() => self.update_checkpoint(&mut checkpoint).await,
313 }
314 }
315 }
316
317 async fn process_package_entry(&self, entry: &LogLeaf) {
319 tracing::debug!("Processing entry {entry:?}");
320
321 let mut state = self.state.write().await;
322 let LogLeaf { log_id, record_id } = entry;
323
324 let registry_index = state.log.length() as RegistryIndex;
326 let commit_res = self
327 .store
328 .commit_package_record(log_id, record_id, registry_index)
329 .await;
330
331 if let Err(err) = commit_res {
332 match err {
333 DataStoreError::Rejection(_)
334 | DataStoreError::OperatorValidationFailed(_)
335 | DataStoreError::PackageValidationFailed(_) => {
336 tracing::debug!("record `{record_id}` rejected: {err:?}");
338 }
339 e => {
340 tracing::error!("failed to validate package record `{record_id}`: {e}");
343 }
344 }
345 return;
346 }
347
348 state.push_entry(entry.clone());
349 }
350
351 async fn update_checkpoint(&self, checkpoint: &mut Checkpoint) {
353 {
354 let mut state = self.state.write().await;
356 if state.log.length() as RegistryLen != checkpoint.log_length {
357 *checkpoint = state.checkpoint();
358 tracing::debug!("Updating to checkpoint {checkpoint:?}");
359 }
360 }
361
362 if let Err(err) = self.sign_and_store_checkpoint(checkpoint.clone()).await {
363 tracing::error!("Error storing checkpoint {checkpoint:?}: {err:?}");
364 }
365 }
366
367 async fn sign_and_store_checkpoint(&self, checkpoint: Checkpoint) -> anyhow::Result<()> {
368 let checkpoint_id = Hash::<Digest>::of(&checkpoint).into();
369 let timestamped = TimestampedCheckpoint::now(checkpoint.clone())?;
370 let signed = SerdeEnvelope::signed_contents(&self.operator_key, timestamped)?;
371 self.store.store_checkpoint(&checkpoint_id, signed).await?;
372 Ok(())
373 }
374}
375
376type VerifiableMap<Digest> = Map<Digest, LogId, MapLeaf>;
377
378#[derive(Default)]
379struct State<Digest: SupportedDigest> {
380 log: VecLog<Digest, LogLeaf>,
382 leaf_index: Vec<Node>,
384
385 map: VerifiableMap<Digest>,
387 map_index: IndexMap<RegistryLen, (Hash<Digest>, VerifiableMap<Digest>)>,
389}
390
391impl<Digest: SupportedDigest> State<Digest> {
392 fn push_entry(&mut self, log_leaf: LogLeaf) {
393 let node = self.log.push(&log_leaf);
394 self.leaf_index.push(node);
395
396 let LogLeaf { log_id, record_id } = log_leaf;
397 self.map = self.map.insert(log_id, MapLeaf { record_id });
398 }
399
400 fn checkpoint(&mut self) -> Checkpoint {
401 let log_checkpoint = self.log.checkpoint();
402 let map_root = self.map.root();
403 let log_length = log_checkpoint.length() as RegistryLen;
404
405 if log_length > 0 {
407 self.map_index
408 .insert(log_length, (map_root.clone(), self.map.clone()));
409 }
410
411 Checkpoint {
412 log_length,
413 log_root: log_checkpoint.root().into(),
414 map_root: map_root.into(),
415 }
416 }
417}
418
419#[derive(Debug, Error)]
420pub enum CoreServiceError {
421 #[error("checkpoint at log length `{0}` was not found")]
422 CheckpointNotFound(RegistryLen),
423 #[error("log leaf `{0}` was not found")]
424 LeafNotFound(RegistryIndex),
425 #[error("failed to bundle proofs: `{0}`")]
426 BundleFailure(anyhow::Error),
427 #[error("failed to prove inclusion of package `{0}`")]
428 PackageNotIncluded(LogId),
429 #[error("failed to prove inclusion: found root `{found}` but was given root `{root}`")]
430 IncorrectProof { root: AnyHash, found: AnyHash },
431 #[error("data store error: {0}")]
432 DataStore(#[from] DataStoreError),
433 #[error("initialization failed: {0}")]
434 InitializationFailure(String),
435}