casper_storage/global_state/state/
lmdb.rs

1use itertools::Itertools;
2use std::{ops::Deref, sync::Arc};
3use tracing::{error, warn};
4
5use lmdb::{DatabaseFlags, RwTransaction};
6
7use tempfile::TempDir;
8
9use casper_types::{
10    execution::{Effects, TransformKindV2, TransformV2},
11    global_state::TrieMerkleProof,
12    Digest, Key, StoredValue,
13};
14
15use super::CommitError;
16use crate::{
17    data_access_layer::{
18        DataAccessLayer, FlushRequest, FlushResult, PutTrieRequest, PutTrieResult, TrieElement,
19        TrieRequest, TrieResult,
20    },
21    global_state::{
22        error::Error as GlobalStateError,
23        state::{
24            commit, put_stored_values, scratch::ScratchGlobalState, CommitProvider,
25            ScratchProvider, StateProvider, StateReader,
26        },
27        store::Store,
28        transaction_source::{lmdb::LmdbEnvironment, Transaction, TransactionSource},
29        trie::{operations::create_hashed_empty_trie, Trie, TrieRaw},
30        trie_store::{
31            lmdb::{LmdbTrieStore, ScratchTrieStore},
32            operations::{
33                keys_with_prefix, missing_children, prune, put_trie, read, read_with_proof,
34                ReadResult, TriePruneResult,
35            },
36        },
37        DEFAULT_ENABLE_ENTITY, DEFAULT_MAX_DB_SIZE, DEFAULT_MAX_QUERY_DEPTH, DEFAULT_MAX_READERS,
38    },
39    tracking_copy::TrackingCopy,
40};
41
42/// Global state implemented against LMDB as a backing data store.
43pub struct LmdbGlobalState {
44    /// Environment for LMDB.
45    pub(crate) environment: Arc<LmdbEnvironment>,
46    /// Trie store held within LMDB.
47    pub(crate) trie_store: Arc<LmdbTrieStore>,
48    /// Empty root hash used for a new trie.
49    pub(crate) empty_root_hash: Digest,
50    /// Max query depth
51    pub max_query_depth: u64,
52    /// Enable the addressable entity and migrate accounts/contracts to entities.
53    pub enable_entity: bool,
54}
55
56/// Represents a "view" of global state at a particular root hash.
57pub struct LmdbGlobalStateView {
58    /// Environment for LMDB.
59    pub(crate) environment: Arc<LmdbEnvironment>,
60    /// Trie store held within LMDB.
61    pub(crate) store: Arc<LmdbTrieStore>,
62    /// Root hash of this "view".
63    pub(crate) root_hash: Digest,
64}
65
66impl LmdbGlobalState {
67    /// Creates an empty state from an existing environment and trie_store.
68    pub fn empty(
69        environment: Arc<LmdbEnvironment>,
70        trie_store: Arc<LmdbTrieStore>,
71        max_query_depth: u64,
72        enable_entity: bool,
73    ) -> Result<Self, GlobalStateError> {
74        let root_hash: Digest = {
75            let (root_hash, root) = compute_empty_root_hash()?;
76            let mut txn = environment.create_read_write_txn()?;
77            trie_store.put(&mut txn, &root_hash, &root)?;
78            txn.commit()?;
79            environment.env().sync(true)?;
80            root_hash
81        };
82        Ok(LmdbGlobalState::new(
83            environment,
84            trie_store,
85            root_hash,
86            max_query_depth,
87            enable_entity,
88        ))
89    }
90
91    /// Creates a state from an existing environment, store, and root_hash.
92    /// Intended to be used for testing.
93    pub fn new(
94        environment: Arc<LmdbEnvironment>,
95        trie_store: Arc<LmdbTrieStore>,
96        empty_root_hash: Digest,
97        max_query_depth: u64,
98        enable_entity: bool,
99    ) -> Self {
100        LmdbGlobalState {
101            environment,
102            trie_store,
103            empty_root_hash,
104            max_query_depth,
105            enable_entity,
106        }
107    }
108
109    /// Creates an in-memory cache for changes written.
110    pub fn create_scratch(&self) -> ScratchGlobalState {
111        ScratchGlobalState::new(
112            Arc::clone(&self.environment),
113            Arc::clone(&self.trie_store),
114            self.empty_root_hash,
115            self.max_query_depth,
116            self.enable_entity,
117        )
118    }
119
120    /// Gets a scratch trie store.
121    pub(crate) fn get_scratch_store(&self) -> ScratchTrieStore {
122        ScratchTrieStore::new(Arc::clone(&self.trie_store), Arc::clone(&self.environment))
123    }
124
125    /// Write stored values to LMDB.
126    pub fn put_stored_values(
127        &self,
128        prestate_hash: Digest,
129        stored_values: Vec<(Key, StoredValue)>,
130    ) -> Result<Digest, GlobalStateError> {
131        let scratch_trie = self.get_scratch_store();
132        let new_state_root = put_stored_values::<_, _, GlobalStateError>(
133            &scratch_trie,
134            &scratch_trie,
135            prestate_hash,
136            stored_values,
137        )?;
138        scratch_trie.write_root_to_db(new_state_root)?;
139        Ok(new_state_root)
140    }
141
142    /// Get a reference to the lmdb global state's environment.
143    #[must_use]
144    pub fn environment(&self) -> &LmdbEnvironment {
145        &self.environment
146    }
147
148    /// Get a reference to the lmdb global state's trie store.
149    #[must_use]
150    pub fn trie_store(&self) -> &LmdbTrieStore {
151        &self.trie_store
152    }
153
154    /// Returns an initial, empty root hash of the underlying trie.
155    pub fn empty_state_root_hash(&self) -> Digest {
156        self.empty_root_hash
157    }
158}
159
160fn compute_empty_root_hash() -> Result<(Digest, Trie<Key, StoredValue>), GlobalStateError> {
161    let (root_hash, root) = create_hashed_empty_trie::<Key, StoredValue>()?;
162    Ok((root_hash, root))
163}
164
165impl StateReader<Key, StoredValue> for LmdbGlobalStateView {
166    type Error = GlobalStateError;
167
168    fn read(&self, key: &Key) -> Result<Option<StoredValue>, Self::Error> {
169        let txn = self.environment.create_read_txn()?;
170        let ret = match read::<Key, StoredValue, lmdb::RoTransaction, LmdbTrieStore, Self::Error>(
171            &txn,
172            self.store.deref(),
173            &self.root_hash,
174            key,
175        )? {
176            ReadResult::Found(value) => Some(value),
177            ReadResult::NotFound => None,
178            ReadResult::RootNotFound => panic!("LmdbGlobalState has invalid root"),
179        };
180        txn.commit()?;
181        Ok(ret)
182    }
183
184    fn read_with_proof(
185        &self,
186        key: &Key,
187    ) -> Result<Option<TrieMerkleProof<Key, StoredValue>>, Self::Error> {
188        let txn = self.environment.create_read_txn()?;
189        let ret = match read_with_proof::<
190            Key,
191            StoredValue,
192            lmdb::RoTransaction,
193            LmdbTrieStore,
194            Self::Error,
195        >(&txn, self.store.deref(), &self.root_hash, key)?
196        {
197            ReadResult::Found(value) => Some(value),
198            ReadResult::NotFound => None,
199            ReadResult::RootNotFound => panic!("LmdbGlobalState has invalid root"),
200        };
201        txn.commit()?;
202        Ok(ret)
203    }
204
205    fn keys_with_prefix(&self, prefix: &[u8]) -> Result<Vec<Key>, Self::Error> {
206        let txn = self.environment.create_read_txn()?;
207        let keys_iter = keys_with_prefix::<Key, StoredValue, _, _>(
208            &txn,
209            self.store.deref(),
210            &self.root_hash,
211            prefix,
212        );
213        let mut ret = Vec::new();
214        for result in keys_iter {
215            match result {
216                Ok(key) => ret.push(key),
217                Err(error) => return Err(error),
218            }
219        }
220        txn.commit()?;
221        Ok(ret)
222    }
223}
224
225impl CommitProvider for LmdbGlobalState {
226    fn commit_effects(
227        &self,
228        prestate_hash: Digest,
229        effects: Effects,
230    ) -> Result<Digest, GlobalStateError> {
231        commit::<LmdbEnvironment, LmdbTrieStore, GlobalStateError>(
232            &self.environment,
233            &self.trie_store,
234            prestate_hash,
235            effects,
236        )
237    }
238
239    fn commit_values(
240        &self,
241        prestate_hash: Digest,
242        values_to_write: Vec<(Key, StoredValue)>,
243        keys_to_prune: std::collections::BTreeSet<Key>,
244    ) -> Result<Digest, GlobalStateError> {
245        let post_write_hash = put_stored_values::<LmdbEnvironment, LmdbTrieStore, GlobalStateError>(
246            &self.environment,
247            &self.trie_store,
248            prestate_hash,
249            values_to_write,
250        )?;
251
252        let mut txn = self.environment.create_read_write_txn()?;
253
254        let maybe_root: Option<Trie<Key, StoredValue>> =
255            self.trie_store.get(&txn, &post_write_hash)?;
256
257        if maybe_root.is_none() {
258            return Err(CommitError::RootNotFound(post_write_hash).into());
259        };
260
261        let mut state_hash = post_write_hash;
262
263        for key in keys_to_prune.into_iter() {
264            let prune_result = prune::<Key, StoredValue, _, LmdbTrieStore, GlobalStateError>(
265                &mut txn,
266                &self.trie_store,
267                &state_hash,
268                &key,
269            )?;
270
271            match prune_result {
272                TriePruneResult::Pruned(root_hash) => {
273                    state_hash = root_hash;
274                }
275                TriePruneResult::MissingKey => {
276                    warn!("commit: pruning attempt failed for {}", key);
277                }
278                TriePruneResult::RootNotFound => {
279                    error!(?state_hash, ?key, "commit: root not found");
280                    return Err(CommitError::WriteRootNotFound(state_hash).into());
281                }
282                TriePruneResult::Failure(gse) => {
283                    return Err(gse);
284                }
285            }
286        }
287
288        txn.commit()?;
289
290        Ok(state_hash)
291    }
292}
293
294impl StateProvider for LmdbGlobalState {
295    type Reader = LmdbGlobalStateView;
296
297    fn flush(&self, _: FlushRequest) -> FlushResult {
298        if self.environment.is_manual_sync_enabled() {
299            match self.environment.sync() {
300                Ok(_) => FlushResult::Success,
301                Err(err) => FlushResult::Failure(err.into()),
302            }
303        } else {
304            FlushResult::ManualSyncDisabled
305        }
306    }
307
308    fn checkout(&self, state_hash: Digest) -> Result<Option<Self::Reader>, GlobalStateError> {
309        let txn = self.environment.create_read_txn()?;
310        let maybe_root: Option<Trie<Key, StoredValue>> = self.trie_store.get(&txn, &state_hash)?;
311        let maybe_state = maybe_root.map(|_| LmdbGlobalStateView {
312            environment: Arc::clone(&self.environment),
313            store: Arc::clone(&self.trie_store),
314            root_hash: state_hash,
315        });
316        txn.commit()?;
317        Ok(maybe_state)
318    }
319
320    fn tracking_copy(
321        &self,
322        hash: Digest,
323    ) -> Result<Option<TrackingCopy<Self::Reader>>, GlobalStateError> {
324        match self.checkout(hash)? {
325            Some(reader) => Ok(Some(TrackingCopy::new(
326                reader,
327                self.max_query_depth,
328                self.enable_entity,
329            ))),
330            None => Ok(None),
331        }
332    }
333
334    fn empty_root(&self) -> Digest {
335        self.empty_root_hash
336    }
337
338    fn trie(&self, request: TrieRequest) -> TrieResult {
339        let key = request.trie_key();
340        let txn = match self.environment.create_read_txn() {
341            Ok(ro) => ro,
342            Err(err) => return TrieResult::Failure(err.into()),
343        };
344        let raw = match Store::<Digest, Trie<Digest, StoredValue>>::get_raw(
345            &*self.trie_store,
346            &txn,
347            &key,
348        ) {
349            Ok(Some(bytes)) => TrieRaw::new(bytes),
350            Ok(None) => {
351                return TrieResult::ValueNotFound(key.to_string());
352            }
353            Err(err) => {
354                return TrieResult::Failure(err);
355            }
356        };
357        match txn.commit() {
358            Ok(_) => match request.chunk_id() {
359                Some(chunk_id) => TrieResult::Success {
360                    element: TrieElement::Chunked(raw, chunk_id),
361                },
362                None => TrieResult::Success {
363                    element: TrieElement::Raw(raw),
364                },
365            },
366            Err(err) => TrieResult::Failure(err.into()),
367        }
368    }
369
370    /// Persists a trie element.
371    fn put_trie(&self, request: PutTrieRequest) -> PutTrieResult {
372        // We only allow bottom-up persistence of trie elements.
373        // Thus we do not persist the element unless we already have all of its descendants
374        // persisted. It is safer to throw away the element and rely on a follow up attempt
375        // to reacquire it later than to allow it to be persisted which would allow runtime
376        // access to acquire a root hash that is missing one or more children which will
377        // result in undefined behavior if a process attempts to access elements below that
378        // root which are not held locally.
379        let bytes = request.raw().inner();
380        match self.missing_children(bytes) {
381            Ok(missing_children) => {
382                if !missing_children.is_empty() {
383                    let hash = Digest::hash_into_chunks_if_necessary(bytes);
384                    return PutTrieResult::Failure(GlobalStateError::MissingTrieNodeChildren(
385                        hash,
386                        request.take_raw(),
387                        missing_children,
388                    ));
389                }
390            }
391            Err(err) => return PutTrieResult::Failure(err),
392        };
393
394        match self.environment.create_read_write_txn() {
395            Ok(mut txn) => {
396                match put_trie::<Key, StoredValue, RwTransaction, LmdbTrieStore, GlobalStateError>(
397                    &mut txn,
398                    &self.trie_store,
399                    bytes,
400                ) {
401                    Ok(hash) => match txn.commit() {
402                        Ok(_) => PutTrieResult::Success { hash },
403                        Err(err) => PutTrieResult::Failure(err.into()),
404                    },
405                    Err(err) => PutTrieResult::Failure(err),
406                }
407            }
408            Err(err) => PutTrieResult::Failure(err.into()),
409        }
410    }
411
412    /// Finds all of the keys of missing directly descendant `Trie<K,V>` values.
413    fn missing_children(&self, trie_raw: &[u8]) -> Result<Vec<Digest>, GlobalStateError> {
414        let txn = self.environment.create_read_txn()?;
415        let missing_hashes = missing_children::<
416            Key,
417            StoredValue,
418            lmdb::RoTransaction,
419            LmdbTrieStore,
420            GlobalStateError,
421        >(&txn, self.trie_store.deref(), trie_raw)?;
422        txn.commit()?;
423        Ok(missing_hashes)
424    }
425
426    fn enable_entity(&self) -> bool {
427        self.enable_entity
428    }
429}
430
431impl ScratchProvider for DataAccessLayer<LmdbGlobalState> {
432    /// Provide a local cached-only version of engine-state.
433    fn get_scratch_global_state(&self) -> ScratchGlobalState {
434        self.state().create_scratch()
435    }
436
437    /// Writes state cached in an `EngineState<ScratchEngineState>` to LMDB.
438    fn write_scratch_to_db(
439        &self,
440        state_root_hash: Digest,
441        scratch_global_state: ScratchGlobalState,
442    ) -> Result<Digest, GlobalStateError> {
443        let (stored_values, keys_to_prune) = scratch_global_state.into_inner();
444        let post_state_hash = self
445            .state()
446            .put_stored_values(state_root_hash, stored_values)?;
447        if keys_to_prune.is_empty() {
448            return Ok(post_state_hash);
449        }
450        let prune_keys = keys_to_prune.iter().cloned().collect_vec();
451        match self.prune_keys(post_state_hash, &prune_keys) {
452            TriePruneResult::Pruned(post_state_hash) => Ok(post_state_hash),
453            TriePruneResult::MissingKey => Err(GlobalStateError::FailedToPrune(prune_keys)),
454            TriePruneResult::RootNotFound => Err(GlobalStateError::RootNotFound),
455            TriePruneResult::Failure(gse) => Err(gse),
456        }
457    }
458
459    /// Prune keys.
460    fn prune_keys(&self, mut state_root_hash: Digest, keys: &[Key]) -> TriePruneResult {
461        let scratch_trie_store = self.state().get_scratch_store();
462
463        let mut txn = match scratch_trie_store.create_read_write_txn() {
464            Ok(scratch) => scratch,
465            Err(gse) => return TriePruneResult::Failure(gse),
466        };
467
468        for key in keys {
469            let prune_results = prune::<Key, StoredValue, _, _, GlobalStateError>(
470                &mut txn,
471                &scratch_trie_store,
472                &state_root_hash,
473                key,
474            );
475            match prune_results {
476                Ok(TriePruneResult::Pruned(new_root)) => {
477                    state_root_hash = new_root;
478                }
479                Ok(TriePruneResult::MissingKey) => continue, // idempotent outcome
480                Ok(other) => return other,
481                Err(gse) => return TriePruneResult::Failure(gse),
482            }
483        }
484
485        if let Err(gse) = txn.commit() {
486            return TriePruneResult::Failure(gse);
487        }
488
489        if let Err(gse) = scratch_trie_store.write_root_to_db(state_root_hash) {
490            TriePruneResult::Failure(gse)
491        } else {
492            TriePruneResult::Pruned(state_root_hash)
493        }
494    }
495}
496
497/// Creates prepopulated LMDB global state instance that stores data in a temporary directory. As
498/// soon as the `TempDir` instance is dropped all the data stored will be removed from the disk as
499/// well.
500pub fn make_temporary_global_state(
501    initial_data: impl IntoIterator<Item = (Key, StoredValue)>,
502) -> (LmdbGlobalState, Digest, TempDir) {
503    let tempdir = tempfile::tempdir().expect("should create tempdir");
504
505    let lmdb_global_state = {
506        let lmdb_environment = LmdbEnvironment::new(
507            tempdir.path(),
508            DEFAULT_MAX_DB_SIZE,
509            DEFAULT_MAX_READERS,
510            false,
511        )
512        .expect("should create lmdb environment");
513        let lmdb_trie_store = LmdbTrieStore::new(&lmdb_environment, None, DatabaseFlags::default())
514            .expect("should create lmdb trie store");
515        LmdbGlobalState::empty(
516            Arc::new(lmdb_environment),
517            Arc::new(lmdb_trie_store),
518            DEFAULT_MAX_QUERY_DEPTH,
519            DEFAULT_ENABLE_ENTITY,
520        )
521        .expect("should create lmdb global state")
522    };
523
524    let mut root_hash = lmdb_global_state.empty_root_hash;
525
526    let mut effects = Effects::new();
527
528    for (key, stored_value) in initial_data {
529        let transform = TransformV2::new(key.normalize(), TransformKindV2::Write(stored_value));
530        effects.push(transform);
531    }
532
533    root_hash = lmdb_global_state
534        .commit_effects(root_hash, effects)
535        .expect("Creation of account should be a success.");
536
537    (lmdb_global_state, root_hash, tempdir)
538}
539
540#[cfg(test)]
541mod tests {
542    use casper_types::{account::AccountHash, execution::TransformKindV2, CLValue, Digest};
543
544    use crate::global_state::state::scratch::tests::TestPair;
545
546    use super::*;
547
548    fn create_test_pairs() -> Vec<(Key, StoredValue)> {
549        vec![
550            (
551                Key::Account(AccountHash::new([1_u8; 32])),
552                StoredValue::CLValue(CLValue::from_t(1_i32).unwrap()),
553            ),
554            (
555                Key::Account(AccountHash::new([2_u8; 32])),
556                StoredValue::CLValue(CLValue::from_t(2_i32).unwrap()),
557            ),
558        ]
559    }
560
561    fn create_test_pairs_updated() -> [TestPair; 3] {
562        [
563            TestPair {
564                key: Key::Account(AccountHash::new([1u8; 32])),
565                value: StoredValue::CLValue(CLValue::from_t("one".to_string()).unwrap()),
566            },
567            TestPair {
568                key: Key::Account(AccountHash::new([2u8; 32])),
569                value: StoredValue::CLValue(CLValue::from_t("two".to_string()).unwrap()),
570            },
571            TestPair {
572                key: Key::Account(AccountHash::new([3u8; 32])),
573                value: StoredValue::CLValue(CLValue::from_t(3_i32).unwrap()),
574            },
575        ]
576    }
577
578    #[test]
579    fn reads_from_a_checkout_return_expected_values() {
580        let test_pairs = create_test_pairs();
581        let (state, root_hash, _tempdir) = make_temporary_global_state(test_pairs.clone());
582        let checkout = state.checkout(root_hash).unwrap().unwrap();
583        for (key, value) in test_pairs {
584            assert_eq!(Some(value), checkout.read(&key).unwrap());
585        }
586    }
587
588    #[test]
589    fn checkout_fails_if_unknown_hash_is_given() {
590        let (state, _, _tempdir) = make_temporary_global_state(create_test_pairs());
591        let fake_hash: Digest = Digest::hash([1u8; 32]);
592        let result = state.checkout(fake_hash).unwrap();
593        assert!(result.is_none());
594    }
595
596    #[test]
597    fn commit_updates_state() {
598        let test_pairs_updated = create_test_pairs_updated();
599
600        let (state, root_hash, _tempdir) = make_temporary_global_state(create_test_pairs());
601
602        let effects = {
603            let mut tmp = Effects::new();
604            for TestPair { key, value } in &test_pairs_updated {
605                let transform = TransformV2::new(*key, TransformKindV2::Write(value.clone()));
606                tmp.push(transform);
607            }
608            tmp
609        };
610
611        let updated_hash = state.commit_effects(root_hash, effects).unwrap();
612
613        let updated_checkout = state.checkout(updated_hash).unwrap().unwrap();
614
615        for TestPair { key, value } in test_pairs_updated.iter().cloned() {
616            assert_eq!(Some(value), updated_checkout.read(&key).unwrap());
617        }
618    }
619
620    #[test]
621    fn commit_updates_state_and_original_state_stays_intact() {
622        let test_pairs_updated = create_test_pairs_updated();
623
624        let (state, root_hash, _tempdir) = make_temporary_global_state(create_test_pairs());
625
626        let effects = {
627            let mut tmp = Effects::new();
628            for TestPair { key, value } in &test_pairs_updated {
629                let transform = TransformV2::new(*key, TransformKindV2::Write(value.clone()));
630                tmp.push(transform);
631            }
632            tmp
633        };
634
635        let updated_hash = state.commit_effects(root_hash, effects).unwrap();
636
637        let updated_checkout = state.checkout(updated_hash).unwrap().unwrap();
638        for TestPair { key, value } in test_pairs_updated.iter().cloned() {
639            assert_eq!(Some(value), updated_checkout.read(&key).unwrap());
640        }
641
642        let original_checkout = state.checkout(root_hash).unwrap().unwrap();
643        for (key, value) in create_test_pairs().iter().cloned() {
644            assert_eq!(Some(value), original_checkout.read(&key).unwrap());
645        }
646        assert_eq!(
647            None,
648            original_checkout.read(&test_pairs_updated[2].key).unwrap()
649        );
650    }
651}