1use itertools::Itertools;
2use std::{ops::Deref, sync::Arc};
3use tracing::{error, warn};
4
5use lmdb::{DatabaseFlags, RwTransaction};
6
7use tempfile::TempDir;
8
9use casper_types::{
10 execution::{Effects, TransformKindV2, TransformV2},
11 global_state::TrieMerkleProof,
12 Digest, Key, StoredValue,
13};
14
15use super::CommitError;
16use crate::{
17 data_access_layer::{
18 DataAccessLayer, FlushRequest, FlushResult, PutTrieRequest, PutTrieResult, TrieElement,
19 TrieRequest, TrieResult,
20 },
21 global_state::{
22 error::Error as GlobalStateError,
23 state::{
24 commit, put_stored_values, scratch::ScratchGlobalState, CommitProvider,
25 ScratchProvider, StateProvider, StateReader,
26 },
27 store::Store,
28 transaction_source::{lmdb::LmdbEnvironment, Transaction, TransactionSource},
29 trie::{operations::create_hashed_empty_trie, Trie, TrieRaw},
30 trie_store::{
31 lmdb::{LmdbTrieStore, ScratchTrieStore},
32 operations::{
33 keys_with_prefix, missing_children, prune, put_trie, read, read_with_proof,
34 ReadResult, TriePruneResult,
35 },
36 },
37 DEFAULT_ENABLE_ENTITY, DEFAULT_MAX_DB_SIZE, DEFAULT_MAX_QUERY_DEPTH, DEFAULT_MAX_READERS,
38 },
39 tracking_copy::TrackingCopy,
40};
41
42pub struct LmdbGlobalState {
44 pub(crate) environment: Arc<LmdbEnvironment>,
46 pub(crate) trie_store: Arc<LmdbTrieStore>,
48 pub(crate) empty_root_hash: Digest,
50 pub max_query_depth: u64,
52 pub enable_entity: bool,
54}
55
56pub struct LmdbGlobalStateView {
58 pub(crate) environment: Arc<LmdbEnvironment>,
60 pub(crate) store: Arc<LmdbTrieStore>,
62 pub(crate) root_hash: Digest,
64}
65
66impl LmdbGlobalState {
67 pub fn empty(
69 environment: Arc<LmdbEnvironment>,
70 trie_store: Arc<LmdbTrieStore>,
71 max_query_depth: u64,
72 enable_entity: bool,
73 ) -> Result<Self, GlobalStateError> {
74 let root_hash: Digest = {
75 let (root_hash, root) = compute_empty_root_hash()?;
76 let mut txn = environment.create_read_write_txn()?;
77 trie_store.put(&mut txn, &root_hash, &root)?;
78 txn.commit()?;
79 environment.env().sync(true)?;
80 root_hash
81 };
82 Ok(LmdbGlobalState::new(
83 environment,
84 trie_store,
85 root_hash,
86 max_query_depth,
87 enable_entity,
88 ))
89 }
90
91 pub fn new(
94 environment: Arc<LmdbEnvironment>,
95 trie_store: Arc<LmdbTrieStore>,
96 empty_root_hash: Digest,
97 max_query_depth: u64,
98 enable_entity: bool,
99 ) -> Self {
100 LmdbGlobalState {
101 environment,
102 trie_store,
103 empty_root_hash,
104 max_query_depth,
105 enable_entity,
106 }
107 }
108
109 pub fn create_scratch(&self) -> ScratchGlobalState {
111 ScratchGlobalState::new(
112 Arc::clone(&self.environment),
113 Arc::clone(&self.trie_store),
114 self.empty_root_hash,
115 self.max_query_depth,
116 self.enable_entity,
117 )
118 }
119
120 pub(crate) fn get_scratch_store(&self) -> ScratchTrieStore {
122 ScratchTrieStore::new(Arc::clone(&self.trie_store), Arc::clone(&self.environment))
123 }
124
125 pub fn put_stored_values(
127 &self,
128 prestate_hash: Digest,
129 stored_values: Vec<(Key, StoredValue)>,
130 ) -> Result<Digest, GlobalStateError> {
131 let scratch_trie = self.get_scratch_store();
132 let new_state_root = put_stored_values::<_, _, GlobalStateError>(
133 &scratch_trie,
134 &scratch_trie,
135 prestate_hash,
136 stored_values,
137 )?;
138 scratch_trie.write_root_to_db(new_state_root)?;
139 Ok(new_state_root)
140 }
141
142 #[must_use]
144 pub fn environment(&self) -> &LmdbEnvironment {
145 &self.environment
146 }
147
148 #[must_use]
150 pub fn trie_store(&self) -> &LmdbTrieStore {
151 &self.trie_store
152 }
153
154 pub fn empty_state_root_hash(&self) -> Digest {
156 self.empty_root_hash
157 }
158}
159
160fn compute_empty_root_hash() -> Result<(Digest, Trie<Key, StoredValue>), GlobalStateError> {
161 let (root_hash, root) = create_hashed_empty_trie::<Key, StoredValue>()?;
162 Ok((root_hash, root))
163}
164
165impl StateReader<Key, StoredValue> for LmdbGlobalStateView {
166 type Error = GlobalStateError;
167
168 fn read(&self, key: &Key) -> Result<Option<StoredValue>, Self::Error> {
169 let txn = self.environment.create_read_txn()?;
170 let ret = match read::<Key, StoredValue, lmdb::RoTransaction, LmdbTrieStore, Self::Error>(
171 &txn,
172 self.store.deref(),
173 &self.root_hash,
174 key,
175 )? {
176 ReadResult::Found(value) => Some(value),
177 ReadResult::NotFound => None,
178 ReadResult::RootNotFound => panic!("LmdbGlobalState has invalid root"),
179 };
180 txn.commit()?;
181 Ok(ret)
182 }
183
184 fn read_with_proof(
185 &self,
186 key: &Key,
187 ) -> Result<Option<TrieMerkleProof<Key, StoredValue>>, Self::Error> {
188 let txn = self.environment.create_read_txn()?;
189 let ret = match read_with_proof::<
190 Key,
191 StoredValue,
192 lmdb::RoTransaction,
193 LmdbTrieStore,
194 Self::Error,
195 >(&txn, self.store.deref(), &self.root_hash, key)?
196 {
197 ReadResult::Found(value) => Some(value),
198 ReadResult::NotFound => None,
199 ReadResult::RootNotFound => panic!("LmdbGlobalState has invalid root"),
200 };
201 txn.commit()?;
202 Ok(ret)
203 }
204
205 fn keys_with_prefix(&self, prefix: &[u8]) -> Result<Vec<Key>, Self::Error> {
206 let txn = self.environment.create_read_txn()?;
207 let keys_iter = keys_with_prefix::<Key, StoredValue, _, _>(
208 &txn,
209 self.store.deref(),
210 &self.root_hash,
211 prefix,
212 );
213 let mut ret = Vec::new();
214 for result in keys_iter {
215 match result {
216 Ok(key) => ret.push(key),
217 Err(error) => return Err(error),
218 }
219 }
220 txn.commit()?;
221 Ok(ret)
222 }
223}
224
225impl CommitProvider for LmdbGlobalState {
226 fn commit_effects(
227 &self,
228 prestate_hash: Digest,
229 effects: Effects,
230 ) -> Result<Digest, GlobalStateError> {
231 commit::<LmdbEnvironment, LmdbTrieStore, GlobalStateError>(
232 &self.environment,
233 &self.trie_store,
234 prestate_hash,
235 effects,
236 )
237 }
238
239 fn commit_values(
240 &self,
241 prestate_hash: Digest,
242 values_to_write: Vec<(Key, StoredValue)>,
243 keys_to_prune: std::collections::BTreeSet<Key>,
244 ) -> Result<Digest, GlobalStateError> {
245 let post_write_hash = put_stored_values::<LmdbEnvironment, LmdbTrieStore, GlobalStateError>(
246 &self.environment,
247 &self.trie_store,
248 prestate_hash,
249 values_to_write,
250 )?;
251
252 let mut txn = self.environment.create_read_write_txn()?;
253
254 let maybe_root: Option<Trie<Key, StoredValue>> =
255 self.trie_store.get(&txn, &post_write_hash)?;
256
257 if maybe_root.is_none() {
258 return Err(CommitError::RootNotFound(post_write_hash).into());
259 };
260
261 let mut state_hash = post_write_hash;
262
263 for key in keys_to_prune.into_iter() {
264 let prune_result = prune::<Key, StoredValue, _, LmdbTrieStore, GlobalStateError>(
265 &mut txn,
266 &self.trie_store,
267 &state_hash,
268 &key,
269 )?;
270
271 match prune_result {
272 TriePruneResult::Pruned(root_hash) => {
273 state_hash = root_hash;
274 }
275 TriePruneResult::MissingKey => {
276 warn!("commit: pruning attempt failed for {}", key);
277 }
278 TriePruneResult::RootNotFound => {
279 error!(?state_hash, ?key, "commit: root not found");
280 return Err(CommitError::WriteRootNotFound(state_hash).into());
281 }
282 TriePruneResult::Failure(gse) => {
283 return Err(gse);
284 }
285 }
286 }
287
288 txn.commit()?;
289
290 Ok(state_hash)
291 }
292}
293
294impl StateProvider for LmdbGlobalState {
295 type Reader = LmdbGlobalStateView;
296
297 fn flush(&self, _: FlushRequest) -> FlushResult {
298 if self.environment.is_manual_sync_enabled() {
299 match self.environment.sync() {
300 Ok(_) => FlushResult::Success,
301 Err(err) => FlushResult::Failure(err.into()),
302 }
303 } else {
304 FlushResult::ManualSyncDisabled
305 }
306 }
307
308 fn checkout(&self, state_hash: Digest) -> Result<Option<Self::Reader>, GlobalStateError> {
309 let txn = self.environment.create_read_txn()?;
310 let maybe_root: Option<Trie<Key, StoredValue>> = self.trie_store.get(&txn, &state_hash)?;
311 let maybe_state = maybe_root.map(|_| LmdbGlobalStateView {
312 environment: Arc::clone(&self.environment),
313 store: Arc::clone(&self.trie_store),
314 root_hash: state_hash,
315 });
316 txn.commit()?;
317 Ok(maybe_state)
318 }
319
320 fn tracking_copy(
321 &self,
322 hash: Digest,
323 ) -> Result<Option<TrackingCopy<Self::Reader>>, GlobalStateError> {
324 match self.checkout(hash)? {
325 Some(reader) => Ok(Some(TrackingCopy::new(
326 reader,
327 self.max_query_depth,
328 self.enable_entity,
329 ))),
330 None => Ok(None),
331 }
332 }
333
334 fn empty_root(&self) -> Digest {
335 self.empty_root_hash
336 }
337
338 fn trie(&self, request: TrieRequest) -> TrieResult {
339 let key = request.trie_key();
340 let txn = match self.environment.create_read_txn() {
341 Ok(ro) => ro,
342 Err(err) => return TrieResult::Failure(err.into()),
343 };
344 let raw = match Store::<Digest, Trie<Digest, StoredValue>>::get_raw(
345 &*self.trie_store,
346 &txn,
347 &key,
348 ) {
349 Ok(Some(bytes)) => TrieRaw::new(bytes),
350 Ok(None) => {
351 return TrieResult::ValueNotFound(key.to_string());
352 }
353 Err(err) => {
354 return TrieResult::Failure(err);
355 }
356 };
357 match txn.commit() {
358 Ok(_) => match request.chunk_id() {
359 Some(chunk_id) => TrieResult::Success {
360 element: TrieElement::Chunked(raw, chunk_id),
361 },
362 None => TrieResult::Success {
363 element: TrieElement::Raw(raw),
364 },
365 },
366 Err(err) => TrieResult::Failure(err.into()),
367 }
368 }
369
370 fn put_trie(&self, request: PutTrieRequest) -> PutTrieResult {
372 let bytes = request.raw().inner();
380 match self.missing_children(bytes) {
381 Ok(missing_children) => {
382 if !missing_children.is_empty() {
383 let hash = Digest::hash_into_chunks_if_necessary(bytes);
384 return PutTrieResult::Failure(GlobalStateError::MissingTrieNodeChildren(
385 hash,
386 request.take_raw(),
387 missing_children,
388 ));
389 }
390 }
391 Err(err) => return PutTrieResult::Failure(err),
392 };
393
394 match self.environment.create_read_write_txn() {
395 Ok(mut txn) => {
396 match put_trie::<Key, StoredValue, RwTransaction, LmdbTrieStore, GlobalStateError>(
397 &mut txn,
398 &self.trie_store,
399 bytes,
400 ) {
401 Ok(hash) => match txn.commit() {
402 Ok(_) => PutTrieResult::Success { hash },
403 Err(err) => PutTrieResult::Failure(err.into()),
404 },
405 Err(err) => PutTrieResult::Failure(err),
406 }
407 }
408 Err(err) => PutTrieResult::Failure(err.into()),
409 }
410 }
411
412 fn missing_children(&self, trie_raw: &[u8]) -> Result<Vec<Digest>, GlobalStateError> {
414 let txn = self.environment.create_read_txn()?;
415 let missing_hashes = missing_children::<
416 Key,
417 StoredValue,
418 lmdb::RoTransaction,
419 LmdbTrieStore,
420 GlobalStateError,
421 >(&txn, self.trie_store.deref(), trie_raw)?;
422 txn.commit()?;
423 Ok(missing_hashes)
424 }
425
426 fn enable_entity(&self) -> bool {
427 self.enable_entity
428 }
429}
430
431impl ScratchProvider for DataAccessLayer<LmdbGlobalState> {
432 fn get_scratch_global_state(&self) -> ScratchGlobalState {
434 self.state().create_scratch()
435 }
436
437 fn write_scratch_to_db(
439 &self,
440 state_root_hash: Digest,
441 scratch_global_state: ScratchGlobalState,
442 ) -> Result<Digest, GlobalStateError> {
443 let (stored_values, keys_to_prune) = scratch_global_state.into_inner();
444 let post_state_hash = self
445 .state()
446 .put_stored_values(state_root_hash, stored_values)?;
447 if keys_to_prune.is_empty() {
448 return Ok(post_state_hash);
449 }
450 let prune_keys = keys_to_prune.iter().cloned().collect_vec();
451 match self.prune_keys(post_state_hash, &prune_keys) {
452 TriePruneResult::Pruned(post_state_hash) => Ok(post_state_hash),
453 TriePruneResult::MissingKey => Err(GlobalStateError::FailedToPrune(prune_keys)),
454 TriePruneResult::RootNotFound => Err(GlobalStateError::RootNotFound),
455 TriePruneResult::Failure(gse) => Err(gse),
456 }
457 }
458
459 fn prune_keys(&self, mut state_root_hash: Digest, keys: &[Key]) -> TriePruneResult {
461 let scratch_trie_store = self.state().get_scratch_store();
462
463 let mut txn = match scratch_trie_store.create_read_write_txn() {
464 Ok(scratch) => scratch,
465 Err(gse) => return TriePruneResult::Failure(gse),
466 };
467
468 for key in keys {
469 let prune_results = prune::<Key, StoredValue, _, _, GlobalStateError>(
470 &mut txn,
471 &scratch_trie_store,
472 &state_root_hash,
473 key,
474 );
475 match prune_results {
476 Ok(TriePruneResult::Pruned(new_root)) => {
477 state_root_hash = new_root;
478 }
479 Ok(TriePruneResult::MissingKey) => continue, Ok(other) => return other,
481 Err(gse) => return TriePruneResult::Failure(gse),
482 }
483 }
484
485 if let Err(gse) = txn.commit() {
486 return TriePruneResult::Failure(gse);
487 }
488
489 if let Err(gse) = scratch_trie_store.write_root_to_db(state_root_hash) {
490 TriePruneResult::Failure(gse)
491 } else {
492 TriePruneResult::Pruned(state_root_hash)
493 }
494 }
495}
496
497pub fn make_temporary_global_state(
501 initial_data: impl IntoIterator<Item = (Key, StoredValue)>,
502) -> (LmdbGlobalState, Digest, TempDir) {
503 let tempdir = tempfile::tempdir().expect("should create tempdir");
504
505 let lmdb_global_state = {
506 let lmdb_environment = LmdbEnvironment::new(
507 tempdir.path(),
508 DEFAULT_MAX_DB_SIZE,
509 DEFAULT_MAX_READERS,
510 false,
511 )
512 .expect("should create lmdb environment");
513 let lmdb_trie_store = LmdbTrieStore::new(&lmdb_environment, None, DatabaseFlags::default())
514 .expect("should create lmdb trie store");
515 LmdbGlobalState::empty(
516 Arc::new(lmdb_environment),
517 Arc::new(lmdb_trie_store),
518 DEFAULT_MAX_QUERY_DEPTH,
519 DEFAULT_ENABLE_ENTITY,
520 )
521 .expect("should create lmdb global state")
522 };
523
524 let mut root_hash = lmdb_global_state.empty_root_hash;
525
526 let mut effects = Effects::new();
527
528 for (key, stored_value) in initial_data {
529 let transform = TransformV2::new(key.normalize(), TransformKindV2::Write(stored_value));
530 effects.push(transform);
531 }
532
533 root_hash = lmdb_global_state
534 .commit_effects(root_hash, effects)
535 .expect("Creation of account should be a success.");
536
537 (lmdb_global_state, root_hash, tempdir)
538}
539
540#[cfg(test)]
541mod tests {
542 use casper_types::{account::AccountHash, execution::TransformKindV2, CLValue, Digest};
543
544 use crate::global_state::state::scratch::tests::TestPair;
545
546 use super::*;
547
548 fn create_test_pairs() -> Vec<(Key, StoredValue)> {
549 vec![
550 (
551 Key::Account(AccountHash::new([1_u8; 32])),
552 StoredValue::CLValue(CLValue::from_t(1_i32).unwrap()),
553 ),
554 (
555 Key::Account(AccountHash::new([2_u8; 32])),
556 StoredValue::CLValue(CLValue::from_t(2_i32).unwrap()),
557 ),
558 ]
559 }
560
561 fn create_test_pairs_updated() -> [TestPair; 3] {
562 [
563 TestPair {
564 key: Key::Account(AccountHash::new([1u8; 32])),
565 value: StoredValue::CLValue(CLValue::from_t("one".to_string()).unwrap()),
566 },
567 TestPair {
568 key: Key::Account(AccountHash::new([2u8; 32])),
569 value: StoredValue::CLValue(CLValue::from_t("two".to_string()).unwrap()),
570 },
571 TestPair {
572 key: Key::Account(AccountHash::new([3u8; 32])),
573 value: StoredValue::CLValue(CLValue::from_t(3_i32).unwrap()),
574 },
575 ]
576 }
577
578 #[test]
579 fn reads_from_a_checkout_return_expected_values() {
580 let test_pairs = create_test_pairs();
581 let (state, root_hash, _tempdir) = make_temporary_global_state(test_pairs.clone());
582 let checkout = state.checkout(root_hash).unwrap().unwrap();
583 for (key, value) in test_pairs {
584 assert_eq!(Some(value), checkout.read(&key).unwrap());
585 }
586 }
587
588 #[test]
589 fn checkout_fails_if_unknown_hash_is_given() {
590 let (state, _, _tempdir) = make_temporary_global_state(create_test_pairs());
591 let fake_hash: Digest = Digest::hash([1u8; 32]);
592 let result = state.checkout(fake_hash).unwrap();
593 assert!(result.is_none());
594 }
595
596 #[test]
597 fn commit_updates_state() {
598 let test_pairs_updated = create_test_pairs_updated();
599
600 let (state, root_hash, _tempdir) = make_temporary_global_state(create_test_pairs());
601
602 let effects = {
603 let mut tmp = Effects::new();
604 for TestPair { key, value } in &test_pairs_updated {
605 let transform = TransformV2::new(*key, TransformKindV2::Write(value.clone()));
606 tmp.push(transform);
607 }
608 tmp
609 };
610
611 let updated_hash = state.commit_effects(root_hash, effects).unwrap();
612
613 let updated_checkout = state.checkout(updated_hash).unwrap().unwrap();
614
615 for TestPair { key, value } in test_pairs_updated.iter().cloned() {
616 assert_eq!(Some(value), updated_checkout.read(&key).unwrap());
617 }
618 }
619
620 #[test]
621 fn commit_updates_state_and_original_state_stays_intact() {
622 let test_pairs_updated = create_test_pairs_updated();
623
624 let (state, root_hash, _tempdir) = make_temporary_global_state(create_test_pairs());
625
626 let effects = {
627 let mut tmp = Effects::new();
628 for TestPair { key, value } in &test_pairs_updated {
629 let transform = TransformV2::new(*key, TransformKindV2::Write(value.clone()));
630 tmp.push(transform);
631 }
632 tmp
633 };
634
635 let updated_hash = state.commit_effects(root_hash, effects).unwrap();
636
637 let updated_checkout = state.checkout(updated_hash).unwrap().unwrap();
638 for TestPair { key, value } in test_pairs_updated.iter().cloned() {
639 assert_eq!(Some(value), updated_checkout.read(&key).unwrap());
640 }
641
642 let original_checkout = state.checkout(root_hash).unwrap().unwrap();
643 for (key, value) in create_test_pairs().iter().cloned() {
644 assert_eq!(Some(value), original_checkout.read(&key).unwrap());
645 }
646 assert_eq!(
647 None,
648 original_checkout.read(&test_pairs_updated[2].key).unwrap()
649 );
650 }
651}