warg_server/services/
core.rs

1use std::{
2    sync::Arc,
3    time::{Duration, SystemTime},
4};
5
6use futures::{pin_mut, StreamExt};
7use indexmap::IndexMap;
8use thiserror::Error;
9use tokio::{
10    sync::{mpsc, RwLock},
11    task::JoinHandle,
12    time::MissedTickBehavior,
13};
14use warg_crypto::{
15    hash::{AnyHash, Hash, Sha256, SupportedDigest},
16    signing::PrivateKey,
17};
18use warg_protocol::{
19    operator,
20    registry::{
21        Checkpoint, LogId, LogLeaf, MapLeaf, RecordId, RegistryIndex, RegistryLen,
22        TimestampedCheckpoint,
23    },
24    ProtoEnvelope, SerdeEnvelope,
25};
26use warg_transparency::{
27    log::{LogBuilder, LogData, LogProofBundle, Node, VecLog},
28    map::{Map, MapProofBundle},
29};
30
31use crate::datastore::{DataStore, DataStoreError};
32
33#[derive(Clone)]
34pub struct CoreService<Digest: SupportedDigest = Sha256> {
35    inner: Arc<Inner<Digest>>,
36
37    // Channel sender used by `submit_package_record` to serialize submissions.
38    submit_entry_tx: mpsc::Sender<LogLeaf>,
39}
40
41impl<Digest: SupportedDigest> CoreService<Digest> {
42    /// Starts the `CoreService`, returning a `clone`able handle to the
43    /// service and a [`JoinHandle`] which should be awaited after dropping all
44    /// copies of the service handle to allow for graceful shutdown.
45    pub async fn start(
46        operator_key: PrivateKey,
47        namespaces: Option<Vec<(String, operator::NamespaceState)>>,
48        store: Box<dyn DataStore>,
49        checkpoint_interval: Duration,
50    ) -> Result<(Self, JoinHandle<()>), CoreServiceError> {
51        // Build service
52        let mut inner = Inner {
53            operator_key,
54            store,
55            state: Default::default(),
56        };
57        inner.initialize(namespaces).await?;
58
59        // Spawn state update task
60        let inner = Arc::new(inner);
61        let (submit_entry_tx, submit_entry_rx) = tokio::sync::mpsc::channel(4);
62        let handle = tokio::spawn(
63            inner
64                .clone()
65                .process_state_updates(submit_entry_rx, checkpoint_interval),
66        );
67
68        let svc = Self {
69            inner,
70            submit_entry_tx,
71        };
72        Ok((svc, handle))
73    }
74
75    /// Constructs a log consistency proof between the given log tree roots.
76    pub async fn log_consistency_proof(
77        &self,
78        from_log_length: RegistryLen,
79        to_log_length: RegistryLen,
80    ) -> Result<LogProofBundle<Digest, LogLeaf>, CoreServiceError> {
81        let state = self.inner.state.read().await;
82
83        let proof = state.log.prove_consistency(from_log_length, to_log_length);
84        LogProofBundle::bundle(vec![proof], vec![], &state.log)
85            .map_err(CoreServiceError::BundleFailure)
86    }
87
88    /// Constructs log inclusion proofs for the given entries at the given log tree root.
89    pub async fn log_inclusion_proofs(
90        &self,
91        log_length: RegistryLen,
92        entries: &[RegistryIndex],
93    ) -> Result<LogProofBundle<Digest, LogLeaf>, CoreServiceError> {
94        let state = self.inner.state.read().await;
95
96        let proofs = entries
97            .iter()
98            .map(|&index| {
99                let node = if index < state.leaf_index.len() as RegistryIndex {
100                    state.leaf_index[index]
101                } else {
102                    return Err(CoreServiceError::LeafNotFound(index));
103                };
104                Ok(state.log.prove_inclusion(node, log_length))
105            })
106            .collect::<Result<Vec<_>, CoreServiceError>>()?;
107
108        LogProofBundle::bundle(vec![], proofs, &state.log).map_err(CoreServiceError::BundleFailure)
109    }
110
111    /// Constructs map inclusion proofs for the given entries at the given map tree root.
112    pub async fn map_inclusion_proofs(
113        &self,
114        log_length: RegistryLen,
115        entries: &[RegistryIndex],
116    ) -> Result<MapProofBundle<Digest, LogId, MapLeaf>, CoreServiceError> {
117        let state = self.inner.state.read().await;
118
119        let (map_root, map) = state
120            .map_index
121            .get(&log_length)
122            .ok_or_else(|| CoreServiceError::CheckpointNotFound(log_length))?;
123
124        let indexes = self
125            .inner
126            .store
127            .get_log_leafs_with_registry_index(entries)
128            .await
129            .map_err(CoreServiceError::DataStore)?;
130
131        let proofs = indexes
132            .iter()
133            .map(|log_leaf| {
134                let LogLeaf { log_id, record_id } = log_leaf;
135
136                let proof = map
137                    .prove(log_id.clone())
138                    .ok_or_else(|| CoreServiceError::PackageNotIncluded(log_id.clone()))?;
139
140                let map_leaf = MapLeaf {
141                    record_id: record_id.clone(),
142                };
143                let found_root = proof.evaluate(log_id, &map_leaf);
144                if &found_root != map_root {
145                    return Err(CoreServiceError::IncorrectProof {
146                        root: map_root.into(),
147                        found: found_root.into(),
148                    });
149                }
150
151                Ok(proof)
152            })
153            .collect::<Result<Vec<_>, CoreServiceError>>()?;
154
155        Ok(MapProofBundle::bundle(proofs))
156    }
157
158    /// Gets the data store associated with the transparency service.
159    pub fn store(&self) -> &dyn DataStore {
160        self.inner.store.as_ref()
161    }
162
163    /// Submits a package record to be processed.
164    pub async fn submit_package_record(&self, log_id: LogId, record_id: RecordId) {
165        self.submit_entry_tx
166            .send(LogLeaf { log_id, record_id })
167            .await
168            .unwrap()
169    }
170}
171
172struct Inner<Digest: SupportedDigest> {
173    // Operator signing key
174    operator_key: PrivateKey,
175
176    // DataStore persists transparency state.
177    store: Box<dyn DataStore>,
178
179    // In-memory transparency state.
180    state: RwLock<State<Digest>>,
181}
182
183impl<Digest: SupportedDigest> Inner<Digest> {
184    // Load state from DataStore or initialize empty state, returning any
185    // entries that are not yet part of a checkpoint.
186    async fn initialize(
187        &mut self,
188        namespaces: Option<Vec<(String, operator::NamespaceState)>>,
189    ) -> Result<(), CoreServiceError> {
190        tracing::debug!("Initializing CoreService");
191
192        let published = self.store.get_all_validated_records().await?.peekable();
193        pin_mut!(published);
194
195        // If there are no published records, initialize a new state
196        if published.as_mut().peek().await.is_none() {
197            tracing::debug!("No existing records; initializing new state");
198            return self.initialize_new(namespaces).await;
199        }
200
201        // Reconstruct internal state from previously-stored data
202        let mut checkpoints = self.store.get_all_checkpoints().await?;
203        let mut checkpoints_by_len: IndexMap<RegistryLen, Checkpoint> = Default::default();
204        while let Some(checkpoint) = checkpoints.next().await {
205            let checkpoint = checkpoint?.checkpoint;
206            checkpoints_by_len.insert(checkpoint.log_length, checkpoint);
207        }
208
209        let state = self.state.get_mut();
210        while let Some(entry) = published.next().await {
211            state.push_entry(entry?);
212            if let Some(stored_checkpoint) =
213                checkpoints_by_len.get(&(state.log.length() as RegistryLen))
214            {
215                // Validate stored checkpoint (and update internal state as a side-effect)
216                let computed_checkpoint = state.checkpoint();
217                assert!(stored_checkpoint == &computed_checkpoint);
218            }
219        }
220
221        Ok(())
222    }
223
224    async fn initialize_new(
225        &mut self,
226        namespaces: Option<Vec<(String, operator::NamespaceState)>>,
227    ) -> Result<(), CoreServiceError> {
228        let state = self.state.get_mut();
229
230        // Construct operator init record
231        let init = operator::OperatorEntry::Init {
232            hash_algorithm: Digest::ALGORITHM,
233            key: self.operator_key.public_key(),
234        };
235        let entries = if let Some(namespaces) = namespaces {
236            let mut entries = Vec::with_capacity(1 + namespaces.len());
237            entries.push(init);
238            for (namespace, state) in namespaces.into_iter() {
239                entries.push(match state {
240                    operator::NamespaceState::Defined => {
241                        operator::OperatorEntry::DefineNamespace { namespace }
242                    }
243                    operator::NamespaceState::Imported { registry } => {
244                        operator::OperatorEntry::ImportNamespace {
245                            namespace,
246                            registry,
247                        }
248                    }
249                });
250            }
251            entries
252        } else {
253            vec![init]
254        };
255
256        let init_record = operator::OperatorRecord {
257            prev: None,
258            version: 0,
259            timestamp: SystemTime::now(),
260            entries,
261        };
262        let signed_init_record =
263            ProtoEnvelope::signed_contents(&self.operator_key, init_record).unwrap();
264        let log_id = LogId::operator_log::<Digest>();
265        let record_id = RecordId::operator_record::<Digest>(&signed_init_record);
266
267        // Store init record
268        self.store
269            .store_operator_record(&log_id, &record_id, &signed_init_record)
270            .await?;
271        self.store
272            .commit_operator_record(&log_id, &record_id, 0)
273            .await?;
274
275        // Update state with init record
276        state.push_entry(LogLeaf { log_id, record_id });
277
278        // "zero" checkpoint to be updated
279        let mut checkpoint = Checkpoint {
280            log_root: Hash::<Digest>::default().into(),
281            log_length: 0,
282            map_root: Hash::<Digest>::default().into(),
283        };
284        self.update_checkpoint(&mut checkpoint).await;
285
286        Ok(())
287    }
288
289    // Runs the service's state update loop.
290    async fn process_state_updates(
291        self: Arc<Self>,
292        mut submit_entry_rx: mpsc::Receiver<LogLeaf>,
293        checkpoint_interval: Duration,
294    ) {
295        let mut checkpoint = self
296            .store
297            .get_latest_checkpoint()
298            .await
299            .unwrap()
300            .into_contents()
301            .checkpoint;
302
303        let mut checkpoint_interval = tokio::time::interval(checkpoint_interval);
304        checkpoint_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
305
306        loop {
307            tokio::select! {
308                entry = submit_entry_rx.recv() => match entry {
309                    Some(entry) => self.process_package_entry(&entry).await,
310                    None => break, // Channel closed
311                },
312                _ = checkpoint_interval.tick() => self.update_checkpoint(&mut checkpoint).await,
313            }
314        }
315    }
316
317    // Processes a submitted package entry
318    async fn process_package_entry(&self, entry: &LogLeaf) {
319        tracing::debug!("Processing entry {entry:?}");
320
321        let mut state = self.state.write().await;
322        let LogLeaf { log_id, record_id } = entry;
323
324        // Validate and commit the package entry to the store
325        let registry_index = state.log.length() as RegistryIndex;
326        let commit_res = self
327            .store
328            .commit_package_record(log_id, record_id, registry_index)
329            .await;
330
331        if let Err(err) = commit_res {
332            match err {
333                DataStoreError::Rejection(_)
334                | DataStoreError::OperatorValidationFailed(_)
335                | DataStoreError::PackageValidationFailed(_) => {
336                    // The record failed to validate and was rejected; do not include it in the next checkpoint
337                    tracing::debug!("record `{record_id}` rejected: {err:?}");
338                }
339                e => {
340                    // TODO: this should be made more robust with a proper reliable message
341                    // queue with retry logic
342                    tracing::error!("failed to validate package record `{record_id}`: {e}");
343                }
344            }
345            return;
346        }
347
348        state.push_entry(entry.clone());
349    }
350
351    // Store a checkpoint including the given new entries
352    async fn update_checkpoint(&self, checkpoint: &mut Checkpoint) {
353        {
354            // Recalculate the checkpoint if necessary
355            let mut state = self.state.write().await;
356            if state.log.length() as RegistryLen != checkpoint.log_length {
357                *checkpoint = state.checkpoint();
358                tracing::debug!("Updating to checkpoint {checkpoint:?}");
359            }
360        }
361
362        if let Err(err) = self.sign_and_store_checkpoint(checkpoint.clone()).await {
363            tracing::error!("Error storing checkpoint {checkpoint:?}: {err:?}");
364        }
365    }
366
367    async fn sign_and_store_checkpoint(&self, checkpoint: Checkpoint) -> anyhow::Result<()> {
368        let checkpoint_id = Hash::<Digest>::of(&checkpoint).into();
369        let timestamped = TimestampedCheckpoint::now(checkpoint.clone())?;
370        let signed = SerdeEnvelope::signed_contents(&self.operator_key, timestamped)?;
371        self.store.store_checkpoint(&checkpoint_id, signed).await?;
372        Ok(())
373    }
374}
375
376type VerifiableMap<Digest> = Map<Digest, LogId, MapLeaf>;
377
378#[derive(Default)]
379struct State<Digest: SupportedDigest> {
380    // The verifiable log of all package log entries
381    log: VecLog<Digest, LogLeaf>,
382    // Index log tree nodes by registry log index of the record
383    leaf_index: Vec<Node>,
384
385    // The verifiable map of package logs' latest entries (log_id -> record_id)
386    map: VerifiableMap<Digest>,
387    // Index verifiable map snapshots by log length (at checkpoints only)
388    map_index: IndexMap<RegistryLen, (Hash<Digest>, VerifiableMap<Digest>)>,
389}
390
391impl<Digest: SupportedDigest> State<Digest> {
392    fn push_entry(&mut self, log_leaf: LogLeaf) {
393        let node = self.log.push(&log_leaf);
394        self.leaf_index.push(node);
395
396        let LogLeaf { log_id, record_id } = log_leaf;
397        self.map = self.map.insert(log_id, MapLeaf { record_id });
398    }
399
400    fn checkpoint(&mut self) -> Checkpoint {
401        let log_checkpoint = self.log.checkpoint();
402        let map_root = self.map.root();
403        let log_length = log_checkpoint.length() as RegistryLen;
404
405        // Update map snapshot
406        if log_length > 0 {
407            self.map_index
408                .insert(log_length, (map_root.clone(), self.map.clone()));
409        }
410
411        Checkpoint {
412            log_length,
413            log_root: log_checkpoint.root().into(),
414            map_root: map_root.into(),
415        }
416    }
417}
418
419#[derive(Debug, Error)]
420pub enum CoreServiceError {
421    #[error("checkpoint at log length `{0}` was not found")]
422    CheckpointNotFound(RegistryLen),
423    #[error("log leaf `{0}` was not found")]
424    LeafNotFound(RegistryIndex),
425    #[error("failed to bundle proofs: `{0}`")]
426    BundleFailure(anyhow::Error),
427    #[error("failed to prove inclusion of package `{0}`")]
428    PackageNotIncluded(LogId),
429    #[error("failed to prove inclusion: found root `{found}` but was given root `{root}`")]
430    IncorrectProof { root: AnyHash, found: AnyHash },
431    #[error("data store error: {0}")]
432    DataStore(#[from] DataStoreError),
433    #[error("initialization failed: {0}")]
434    InitializationFailure(String),
435}