1use std::{collections::HashMap, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10 crypto::CryptoHash,
11 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
12 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_cache::ValueCache;
15use linera_chain::{
16 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
17 ChainStateView,
18};
19use linera_execution::{
20 BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
21 WasmRuntime,
22};
23use linera_views::{
24 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
25 batch::Batch,
26 context::ViewContext,
27 store::{
28 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
29 },
30 views::View,
31 ViewError,
32};
33use serde::{Deserialize, Serialize};
34use tracing::instrument;
35#[cfg(with_testing)]
36use {
37 futures::channel::oneshot::{self, Receiver},
38 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
39 std::{cmp::Reverse, collections::BTreeMap},
40};
41
42use crate::{ChainRuntimeContext, Clock, Storage};
43
44#[cfg(with_metrics)]
45pub mod metrics {
46 use std::sync::LazyLock;
47
48 use linera_base::prometheus_util::{
49 exponential_bucket_latencies, register_histogram_vec, register_int_counter,
50 register_int_counter_vec,
51 };
52 use prometheus::{HistogramVec, IntCounter, IntCounterVec};
53
54 pub(super) const SOURCE_LABEL: &str = "source";
56 pub(super) const CACHE: &str = "cache";
58 pub(super) const DB: &str = "db";
60
61 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
63 register_int_counter_vec(
64 "contains_blob",
65 "The metric counting how often a blob is tested for existence from storage",
66 &[SOURCE_LABEL],
67 )
68 });
69
70 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
72 register_int_counter_vec(
73 "contains_blobs",
74 "The metric counting how often multiple blobs are tested for existence from storage",
75 &[SOURCE_LABEL],
76 )
77 });
78
79 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
81 register_int_counter_vec(
82 "contains_blob_state",
83 "The metric counting how often a blob state is tested for existence from storage",
84 &[SOURCE_LABEL],
85 )
86 });
87
88 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "contains_certificate",
92 "The metric counting how often a certificate is tested for existence from storage",
93 &[SOURCE_LABEL],
94 )
95 });
96
97 #[doc(hidden)]
99 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
100 register_int_counter_vec(
101 "read_confirmed_block",
102 "The metric counting how often a hashed confirmed block is read from storage",
103 &[SOURCE_LABEL],
104 )
105 });
106
107 #[doc(hidden)]
109 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
110 LazyLock::new(|| {
111 register_int_counter_vec(
112 "read_confirmed_blocks",
113 "The metric counting how often confirmed blocks are read from storage",
114 &[SOURCE_LABEL],
115 )
116 });
117
118 #[doc(hidden)]
120 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
121 register_int_counter_vec(
122 "read_blob",
123 "The metric counting how often a blob is read from storage",
124 &[SOURCE_LABEL],
125 )
126 });
127
128 #[doc(hidden)]
130 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
131 register_int_counter_vec(
132 "read_blob_state",
133 "The metric counting how often a blob state is read from storage",
134 &[SOURCE_LABEL],
135 )
136 });
137
138 #[doc(hidden)]
140 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
141 register_int_counter(
142 "write_blob",
143 "The metric counting how often a blob is written to storage",
144 )
145 });
146
147 #[doc(hidden)]
149 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
150 register_int_counter_vec(
151 "read_certificate",
152 "The metric counting how often a certificate is read from storage",
153 &[SOURCE_LABEL],
154 )
155 });
156
157 #[doc(hidden)]
159 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
160 register_int_counter_vec(
161 "read_certificates",
162 "The metric counting how often certificate are read from storage",
163 &[SOURCE_LABEL],
164 )
165 });
166
167 #[doc(hidden)]
169 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
170 register_int_counter(
171 "write_certificate",
172 "The metric counting how often a certificate is written to storage",
173 )
174 });
175
176 #[doc(hidden)]
178 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
179 register_histogram_vec(
180 "load_chain_latency",
181 "The latency to load a chain state",
182 &[],
183 exponential_bucket_latencies(1000.0),
184 )
185 });
186
187 #[doc(hidden)]
189 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
190 register_int_counter_vec(
191 "read_event",
192 "The metric counting how often an event is read from storage",
193 &[SOURCE_LABEL],
194 )
195 });
196
197 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
199 register_int_counter_vec(
200 "contains_event",
201 "The metric counting how often an event is tested for existence from storage",
202 &[SOURCE_LABEL],
203 )
204 });
205
206 #[doc(hidden)]
208 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
209 register_int_counter(
210 "write_event",
211 "The metric counting how often an event is written to storage",
212 )
213 });
214
215 #[doc(hidden)]
217 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
218 register_int_counter_vec(
219 "network_description",
220 "The metric counting how often the network description is read from storage",
221 &[SOURCE_LABEL],
222 )
223 });
224
225 #[doc(hidden)]
227 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
228 register_int_counter(
229 "write_network_description",
230 "The metric counting how often the network description is written to storage",
231 )
232 });
233}
234
235pub(crate) const BLOB_KEY: &[u8] = &[42];
237
238pub(crate) const BLOB_STATE_KEY: &[u8] = &[49];
240
241pub(crate) const LITE_CERTIFICATE_KEY: &[u8] = &[91];
243
244pub(crate) const BLOCK_KEY: &[u8] = &[221];
246
247pub(crate) const NETWORK_DESCRIPTION_KEY: &[u8] = &[119];
249
250fn get_block_keys() -> Vec<Vec<u8>> {
251 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
252}
253
254#[derive(Default)]
255#[expect(clippy::type_complexity)]
256pub(crate) struct MultiPartitionBatch {
257 keys_value_bytes: Vec<(Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>)>,
258}
259
260impl MultiPartitionBatch {
261 pub(crate) fn new() -> Self {
262 Self::default()
263 }
264
265 pub(crate) fn put_key_values(
266 &mut self,
267 root_key: Vec<u8>,
268 key_values: Vec<(Vec<u8>, Vec<u8>)>,
269 ) {
270 self.keys_value_bytes.push((root_key, key_values));
271 }
272
273 pub(crate) fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
274 self.put_key_values(root_key, vec![(key, value)]);
275 }
276
277 fn add_blob(&mut self, blob: &Blob) {
278 #[cfg(with_metrics)]
279 metrics::WRITE_BLOB_COUNTER.inc();
280 let root_key = RootKey::Blob(blob.id()).bytes();
281 let key = BLOB_KEY.to_vec();
282 self.put_key_value(root_key, key, blob.bytes().to_vec());
283 }
284
285 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
286 let root_key = RootKey::Blob(blob_id).bytes();
287 let key = BLOB_STATE_KEY.to_vec();
288 let value = bcs::to_bytes(blob_state)?;
289 self.put_key_value(root_key, key, value);
290 Ok(())
291 }
292
293 fn add_certificate(
302 &mut self,
303 certificate: &ConfirmedBlockCertificate,
304 ) -> Result<(), ViewError> {
305 #[cfg(with_metrics)]
306 metrics::WRITE_CERTIFICATE_COUNTER.inc();
307 let hash = certificate.hash();
308
309 let root_key = RootKey::ConfirmedBlock(hash).bytes();
311 let mut key_values = Vec::new();
312 let key = LITE_CERTIFICATE_KEY.to_vec();
313 let value = bcs::to_bytes(&certificate.lite_certificate())?;
314 key_values.push((key, value));
315 let key = BLOCK_KEY.to_vec();
316 let value = bcs::to_bytes(&certificate.value())?;
317 key_values.push((key, value));
318 self.put_key_values(root_key, key_values);
319
320 let chain_id = certificate.value().block().header.chain_id;
322 let height = certificate.value().block().header.height;
323 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
324 let height_key = to_height_key(height);
325 let index_value = bcs::to_bytes(&hash)?;
326 self.put_key_value(index_root_key, height_key, index_value);
327
328 Ok(())
329 }
330
331 fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
332 #[cfg(with_metrics)]
333 metrics::WRITE_EVENT_COUNTER.inc();
334 let key = to_event_key(event_id);
335 let root_key = RootKey::Event(event_id.chain_id).bytes();
336 self.put_key_value(root_key, key, value);
337 }
338
339 fn add_network_description(
340 &mut self,
341 information: &NetworkDescription,
342 ) -> Result<(), ViewError> {
343 #[cfg(with_metrics)]
344 metrics::WRITE_NETWORK_DESCRIPTION.inc();
345 let root_key = RootKey::NetworkDescription.bytes();
346 let key = NETWORK_DESCRIPTION_KEY.to_vec();
347 let value = bcs::to_bytes(information)?;
348 self.put_key_value(root_key, key, value);
349 Ok(())
350 }
351}
352
353#[derive(Clone, Copy, Debug)]
355pub struct StorageCacheConfig {
356 pub blob_cache_size: usize,
357 pub confirmed_block_cache_size: usize,
358 pub certificate_cache_size: usize,
359 pub certificate_raw_cache_size: usize,
360 pub event_cache_size: usize,
361 pub cache_cleanup_interval_secs: u64,
362}
363
364#[cfg(with_testing)]
366pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
367 blob_cache_size: 1000,
368 confirmed_block_cache_size: 1000,
369 certificate_cache_size: 1000,
370 certificate_raw_cache_size: 1000,
371 event_cache_size: 1000,
372 cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
373};
374
375type RawCertificate = (Vec<u8>, Vec<u8>);
377
378#[derive(Clone)]
384pub struct StorageCaches {
385 pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
386 pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
387 pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
388 pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
389 pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
390}
391
392impl StorageCaches {
393 pub fn new(sizes: StorageCacheConfig) -> Self {
395 let interval = sizes.cache_cleanup_interval_secs;
396 Self {
397 blob: Arc::new(ValueCache::new(sizes.blob_cache_size, interval)),
398 confirmed_block: Arc::new(ValueCache::new(sizes.confirmed_block_cache_size, interval)),
399 certificate: Arc::new(ValueCache::new(sizes.certificate_cache_size, interval)),
400 certificate_raw: Arc::new(ValueCache::new(sizes.certificate_raw_cache_size, interval)),
401 event: Arc::new(ValueCache::new(sizes.event_cache_size, interval)),
402 }
403 }
404}
405
406#[derive(Clone)]
408pub struct DbStorage<Database, Clock = WallClock> {
409 pub(crate) database: Arc<Database>,
410 clock: Clock,
411 thread_pool: Arc<linera_execution::ThreadPool>,
412 wasm_runtime: Option<WasmRuntime>,
413 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
414 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
415 shared_committees: SharedCommittees,
416 caches: StorageCaches,
417 execution_runtime_config: ExecutionRuntimeConfig,
418}
419
420#[derive(Debug, Serialize, Deserialize)]
421pub enum RootKey {
422 ChainState(ChainId),
423 ConfirmedBlock(CryptoHash),
424 Blob(BlobId),
425 Event(ChainId),
426 Placeholder,
427 NetworkDescription,
428 BlockExporterState(u32),
429 BlockByHeight(ChainId),
430}
431
432const CHAIN_ID_TAG: u8 = 0;
433const BLOB_ID_TAG: u8 = 2;
434
435impl RootKey {
436 pub fn bytes(&self) -> Vec<u8> {
437 bcs::to_bytes(self).unwrap()
438 }
439}
440
441#[derive(Debug, Serialize, Deserialize)]
442pub(crate) struct RestrictedEventId {
443 pub stream_id: StreamId,
444 pub index: u32,
445}
446
447pub(crate) fn to_event_key(event_id: &EventId) -> Vec<u8> {
448 let restricted_event_id = RestrictedEventId {
449 stream_id: event_id.stream_id.clone(),
450 index: event_id.index,
451 };
452 bcs::to_bytes(&restricted_event_id).unwrap()
453}
454
455pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
456 bcs::to_bytes(&height).unwrap()
457}
458
459fn is_chain_state(root_key: &[u8]) -> bool {
460 if root_key.is_empty() {
461 return false;
462 }
463 root_key[0] == CHAIN_ID_TAG
464}
465
466#[cfg(test)]
467mod tests {
468 use linera_base::{
469 crypto::{CryptoHash, TestString},
470 data_types::{BlockHeight, Epoch, Round, Timestamp},
471 identifiers::{
472 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
473 StreamName,
474 },
475 };
476 use linera_chain::{
477 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
478 types::ConfirmedBlockCertificate,
479 };
480 use linera_views::{
481 memory::MemoryDatabase,
482 store::{KeyValueDatabase, ReadableKeyValueStore as _},
483 };
484
485 use crate::{
486 db_storage::{
487 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
488 },
489 DbStorage, Storage, TestClock,
490 };
491
492 #[test]
499 fn test_root_key_blob_serialization() {
500 let hash = CryptoHash::default();
501 let blob_type = BlobType::default();
502 let blob_id = BlobId::new(hash, blob_type);
503 let root_key = RootKey::Blob(blob_id).bytes();
504 assert_eq!(root_key[0], BLOB_ID_TAG);
505 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
506 }
507
508 #[test]
511 fn test_root_key_chainstate_serialization() {
512 let hash = CryptoHash::default();
513 let chain_id = ChainId(hash);
514 let root_key = RootKey::ChainState(chain_id).bytes();
515 assert_eq!(root_key[0], CHAIN_ID_TAG);
516 assert_eq!(
517 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
518 chain_id
519 );
520 }
521
522 #[test]
525 fn test_root_key_event_serialization() {
526 let hash = CryptoHash::test_hash("49");
527 let chain_id = ChainId(hash);
528 let application_description_hash = CryptoHash::test_hash("42");
529 let application_id = ApplicationId::new(application_description_hash);
530 let application_id = GenericApplicationId::User(application_id);
531 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
532 let stream_id = StreamId {
533 application_id,
534 stream_name,
535 };
536 let prefix = bcs::to_bytes(&stream_id).unwrap();
537
538 let index = 1567;
539 let event_id = EventId {
540 chain_id,
541 stream_id,
542 index,
543 };
544 let key = to_event_key(&event_id);
545 assert!(key.starts_with(&prefix));
546 }
547
548 #[test]
551 fn test_root_key_block_by_height_serialization() {
552 use linera_base::data_types::BlockHeight;
553
554 let hash = CryptoHash::default();
555 let chain_id = ChainId(hash);
556 let height = BlockHeight(42);
557
558 let root_key = RootKey::BlockByHeight(chain_id).bytes();
560 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
561 assert_eq!(deserialized_chain_id, chain_id);
562
563 let height_key = to_height_key(height);
565 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
566 assert_eq!(deserialized_height, height);
567 }
568
569 #[cfg(with_testing)]
570 #[tokio::test]
571 async fn test_add_certificate_creates_height_index() {
572 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
574
575 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
577 let height = BlockHeight(5);
578 let block = Block {
579 header: BlockHeader {
580 chain_id,
581 epoch: Epoch::ZERO,
582 height,
583 timestamp: Timestamp::from(0),
584 state_hash: CryptoHash::new(&TestString::new("state_hash")),
585 previous_block_hash: None,
586 authenticated_signer: None,
587 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
588 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
589 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
590 "prev_msg_blocks_hash",
591 )),
592 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
593 "prev_event_blocks_hash",
594 )),
595 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
596 events_hash: CryptoHash::new(&TestString::new("events_hash")),
597 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
598 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
599 },
600 body: BlockBody {
601 transactions: vec![],
602 messages: vec![],
603 previous_message_blocks: Default::default(),
604 previous_event_blocks: Default::default(),
605 oracle_responses: vec![],
606 events: vec![],
607 blobs: vec![],
608 operation_results: vec![],
609 },
610 };
611 let confirmed_block = ConfirmedBlock::new(block);
612 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
613
614 let mut batch = MultiPartitionBatch::new();
616 batch.add_certificate(&certificate).unwrap();
617 storage.write_batch(batch).await.unwrap();
618
619 let hash = certificate.hash();
621 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
622 let store = storage.database.open_shared(&index_root_key).unwrap();
623 let height_key = to_height_key(height);
624 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
625
626 assert!(value_bytes.is_some(), "Height index was not created");
627 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
628 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
629 }
630
631 #[cfg(with_testing)]
632 #[tokio::test]
633 async fn test_read_certificates_by_heights() {
634 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
635 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
636
637 let mut batch = MultiPartitionBatch::new();
639 let mut expected_certs = vec![];
640
641 for height in [1, 3, 5] {
642 let block = Block {
643 header: BlockHeader {
644 chain_id,
645 epoch: Epoch::ZERO,
646 height: BlockHeight(height),
647 timestamp: Timestamp::from(0),
648 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
649 previous_block_hash: None,
650 authenticated_signer: None,
651 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
652 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
653 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
654 "pmb_hash_{height}",
655 )),
656 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
657 "peb_hash_{height}",
658 )),
659 oracle_responses_hash: CryptoHash::new(&TestString::new(
660 "oracle_hash_{height}",
661 )),
662 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
663 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
664 operation_results_hash: CryptoHash::new(&TestString::new(
665 "op_results_hash_{height}",
666 )),
667 },
668 body: BlockBody {
669 transactions: vec![],
670 messages: vec![],
671 previous_message_blocks: Default::default(),
672 previous_event_blocks: Default::default(),
673 oracle_responses: vec![],
674 events: vec![],
675 blobs: vec![],
676 operation_results: vec![],
677 },
678 };
679 let confirmed_block = ConfirmedBlock::new(block);
680 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
681 expected_certs.push((height, cert.clone()));
682 batch.add_certificate(&cert).unwrap();
683 }
684 storage.write_batch(batch).await.unwrap();
685
686 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
688 let result = storage
689 .read_certificates_by_heights(chain_id, &heights)
690 .await
691 .unwrap();
692 assert_eq!(result.len(), 3);
693 assert_eq!(
694 result[0].as_ref().unwrap().hash(),
695 expected_certs[0].1.hash()
696 );
697 assert_eq!(
698 result[1].as_ref().unwrap().hash(),
699 expected_certs[1].1.hash()
700 );
701 assert_eq!(
702 result[2].as_ref().unwrap().hash(),
703 expected_certs[2].1.hash()
704 );
705
706 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
708 let result = storage
709 .read_certificates_by_heights(chain_id, &heights)
710 .await
711 .unwrap();
712 assert_eq!(result.len(), 3);
713 assert_eq!(
714 result[0].as_ref().unwrap().hash(),
715 expected_certs[2].1.hash()
716 );
717 assert_eq!(
718 result[1].as_ref().unwrap().hash(),
719 expected_certs[0].1.hash()
720 );
721 assert_eq!(
722 result[2].as_ref().unwrap().hash(),
723 expected_certs[1].1.hash()
724 );
725
726 let heights = vec![
728 BlockHeight(1),
729 BlockHeight(2),
730 BlockHeight(3),
731 BlockHeight(3),
732 ];
733 let result = storage
734 .read_certificates_by_heights(chain_id, &heights)
735 .await
736 .unwrap();
737 assert_eq!(result.len(), 4); assert!(result[0].is_some());
739 assert!(result[1].is_none()); assert!(result[2].is_some());
741 assert_eq!(
742 result[2].as_ref().unwrap().hash(),
743 result[3].as_ref().unwrap().hash()
744 ); let heights = vec![];
748 let result = storage
749 .read_certificates_by_heights(chain_id, &heights)
750 .await
751 .unwrap();
752 assert_eq!(result.len(), 0);
753 }
754
755 #[cfg(with_testing)]
756 #[tokio::test]
757 async fn test_read_certificates_by_heights_multiple_chains() {
758 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
759
760 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
762 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
763
764 let mut batch = MultiPartitionBatch::new();
765
766 let block_a = Block {
767 header: BlockHeader {
768 chain_id: chain_a,
769 epoch: Epoch::ZERO,
770 height: BlockHeight(10),
771 timestamp: Timestamp::from(0),
772 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
773 previous_block_hash: None,
774 authenticated_signer: None,
775 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
776 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
777 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
778 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
779 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
780 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
781 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
782 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
783 },
784 body: BlockBody {
785 transactions: vec![],
786 messages: vec![],
787 previous_message_blocks: Default::default(),
788 previous_event_blocks: Default::default(),
789 oracle_responses: vec![],
790 events: vec![],
791 blobs: vec![],
792 operation_results: vec![],
793 },
794 };
795 let confirmed_block_a = ConfirmedBlock::new(block_a);
796 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
797 batch.add_certificate(&cert_a).unwrap();
798
799 let block_b = Block {
800 header: BlockHeader {
801 chain_id: chain_b,
802 epoch: Epoch::ZERO,
803 height: BlockHeight(10),
804 timestamp: Timestamp::from(0),
805 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
806 previous_block_hash: None,
807 authenticated_signer: None,
808 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
809 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
810 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
811 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
812 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
813 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
814 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
815 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
816 },
817 body: BlockBody {
818 transactions: vec![],
819 messages: vec![],
820 previous_message_blocks: Default::default(),
821 previous_event_blocks: Default::default(),
822 oracle_responses: vec![],
823 events: vec![],
824 blobs: vec![],
825 operation_results: vec![],
826 },
827 };
828 let confirmed_block_b = ConfirmedBlock::new(block_b);
829 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
830 batch.add_certificate(&cert_b).unwrap();
831
832 storage.write_batch(batch).await.unwrap();
833
834 let result = storage
836 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
837 .await
838 .unwrap();
839 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
840
841 let result = storage
843 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
844 .await
845 .unwrap();
846 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
847
848 let result = storage
850 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
851 .await
852 .unwrap();
853 assert!(result[0].is_none());
854 }
855
856 #[cfg(with_testing)]
857 #[tokio::test]
858 async fn test_read_certificates_by_heights_consistency() {
859 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
860 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
861
862 let mut batch = MultiPartitionBatch::new();
864 let block = Block {
865 header: BlockHeader {
866 chain_id,
867 epoch: Epoch::ZERO,
868 height: BlockHeight(7),
869 timestamp: Timestamp::from(0),
870 state_hash: CryptoHash::new(&TestString::new("state_hash")),
871 previous_block_hash: None,
872 authenticated_signer: None,
873 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
874 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
875 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
876 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
877 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
878 events_hash: CryptoHash::new(&TestString::new("events_hash")),
879 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
880 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
881 },
882 body: BlockBody {
883 transactions: vec![],
884 messages: vec![],
885 previous_message_blocks: Default::default(),
886 previous_event_blocks: Default::default(),
887 oracle_responses: vec![],
888 events: vec![],
889 blobs: vec![],
890 operation_results: vec![],
891 },
892 };
893 let confirmed_block = ConfirmedBlock::new(block);
894 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
895 let hash = cert.hash();
896 batch.add_certificate(&cert).unwrap();
897 storage.write_batch(batch).await.unwrap();
898
899 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
901
902 let certs_by_height = storage
904 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
905 .await
906 .unwrap();
907 let cert_by_height = certs_by_height[0].as_ref().unwrap();
908
909 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
911 assert_eq!(
912 cert_by_hash.value().block().header,
913 cert_by_height.value().block().header
914 );
915 }
916
917 #[cfg(with_testing)]
921 #[tokio::test]
922 async fn test_write_certificate_height_indices_populates_reverse_index() {
923 use linera_views::{batch::Batch, store::WritableKeyValueStore as _};
924
925 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
926 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
927
928 let block = Block {
931 header: BlockHeader {
932 chain_id,
933 epoch: Epoch::ZERO,
934 height: BlockHeight(10),
935 timestamp: Timestamp::from(0),
936 state_hash: CryptoHash::new(&TestString::new("state_hash")),
937 previous_block_hash: None,
938 authenticated_signer: None,
939 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
940 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
941 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
942 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
943 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
944 events_hash: CryptoHash::new(&TestString::new("events_hash")),
945 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
946 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
947 },
948 body: BlockBody {
949 transactions: vec![],
950 messages: vec![],
951 previous_message_blocks: Default::default(),
952 previous_event_blocks: Default::default(),
953 oracle_responses: vec![],
954 events: vec![],
955 blobs: vec![],
956 operation_results: vec![],
957 },
958 };
959 let confirmed_block = ConfirmedBlock::new(block);
960 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
961 let hash = cert.hash();
962 let height = BlockHeight(10);
963
964 let root_key = RootKey::ConfirmedBlock(hash).bytes();
966 let store = storage.database.open_shared(&root_key).unwrap();
967 let mut batch = Batch::new();
968 batch.put_key_value_bytes(
969 crate::db_storage::LITE_CERTIFICATE_KEY.to_vec(),
970 bcs::to_bytes(&cert.lite_certificate()).unwrap(),
971 );
972 batch.put_key_value_bytes(
973 crate::db_storage::BLOCK_KEY.to_vec(),
974 bcs::to_bytes(&cert.value()).unwrap(),
975 );
976 store.write_batch(batch).await.unwrap();
977
978 let result = storage
980 .read_certificates_by_heights(chain_id, &[height])
981 .await
982 .unwrap();
983 assert!(
984 result[0].is_none(),
985 "Height index should not exist before write_certificate_height_indices"
986 );
987
988 let cert_by_hash = storage.read_certificate(hash).await.unwrap();
990 assert!(cert_by_hash.is_some(), "Certificate should exist by hash");
991
992 storage
994 .write_certificate_height_indices(chain_id, &[(height, hash)])
995 .await
996 .unwrap();
997
998 let result = storage
1000 .read_certificates_by_heights(chain_id, &[height])
1001 .await
1002 .unwrap();
1003 assert!(
1004 result[0].is_some(),
1005 "Height index should exist after write_certificate_height_indices"
1006 );
1007 assert_eq!(
1008 result[0].as_ref().unwrap().hash(),
1009 hash,
1010 "Certificate retrieved by height should match original"
1011 );
1012 }
1013}
1014
1015#[derive(Clone, Copy)]
1018pub struct ChainStatesFirstAssignment;
1019
1020impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
1021 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
1022 if root_key.is_empty() {
1023 return Ok(StoreInUse::Second);
1024 }
1025 let store = match is_chain_state(root_key) {
1026 true => StoreInUse::First,
1027 false => StoreInUse::Second,
1028 };
1029 Ok(store)
1030 }
1031}
1032
1033#[derive(Clone)]
1035pub struct WallClock;
1036
1037#[cfg_attr(not(web), async_trait)]
1038#[cfg_attr(web, async_trait(?Send))]
1039impl Clock for WallClock {
1040 fn current_time(&self) -> Timestamp {
1041 Timestamp::now()
1042 }
1043
1044 async fn sleep_until(&self, timestamp: Timestamp) {
1045 let delta = timestamp.delta_since(Timestamp::now());
1046 if delta > TimeDelta::ZERO {
1047 linera_base::time::timer::sleep(delta.as_duration()).await
1048 }
1049 }
1050}
1051
1052#[cfg(with_testing)]
1053#[derive(Default)]
1054struct TestClockInner {
1055 time: Timestamp,
1056 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
1057 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
1060}
1061
1062#[cfg(with_testing)]
1063impl TestClockInner {
1064 fn set(&mut self, time: Timestamp) {
1065 self.time = time;
1066 let senders = self.sleeps.split_off(&Reverse(time));
1067 for sender in senders.into_values().flatten() {
1068 sender.send(()).ok();
1070 }
1071 }
1072
1073 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
1074 let (sender, receiver) = oneshot::channel();
1075 let should_auto_advance = self
1076 .sleep_callback
1077 .as_ref()
1078 .is_some_and(|callback| callback(time));
1079 if should_auto_advance && time > self.time {
1080 self.set(time);
1082 sender.send(()).ok();
1084 } else if self.time >= time {
1085 sender.send(()).ok();
1087 } else {
1088 self.sleeps.entry(Reverse(time)).or_default().push(sender);
1089 }
1090 receiver
1091 }
1092}
1093
1094#[cfg(with_testing)]
1097#[derive(Clone, Default)]
1098pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
1099
1100#[cfg(with_testing)]
1101#[cfg_attr(not(web), async_trait)]
1102#[cfg_attr(web, async_trait(?Send))]
1103impl Clock for TestClock {
1104 fn current_time(&self) -> Timestamp {
1105 self.lock().time
1106 }
1107
1108 async fn sleep_until(&self, timestamp: Timestamp) {
1109 let receiver = self.lock().add_sleep_until(timestamp);
1110 receiver.await.ok();
1112 }
1113}
1114
1115#[cfg(with_testing)]
1116impl TestClock {
1117 pub fn new() -> Self {
1119 TestClock(Arc::default())
1120 }
1121
1122 pub fn set(&self, time: Timestamp) {
1124 self.lock().set(time);
1125 }
1126
1127 pub fn add(&self, delta: TimeDelta) {
1129 let mut guard = self.lock();
1130 let time = guard.time.saturating_add(delta);
1131 guard.set(time);
1132 }
1133
1134 pub fn current_time(&self) -> Timestamp {
1136 self.lock().time
1137 }
1138
1139 pub fn set_sleep_callback<F>(&self, callback: F)
1144 where
1145 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
1146 {
1147 self.lock().sleep_callback = Some(Box::new(callback));
1148 }
1149
1150 pub fn clear_sleep_callback(&self) {
1152 self.lock().sleep_callback = None;
1153 }
1154
1155 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
1156 self.0.lock().expect("poisoned TestClock mutex")
1157 }
1158}
1159
1160#[cfg_attr(not(web), async_trait)]
1161#[cfg_attr(web, async_trait(?Send))]
1162impl<Database, C> Storage for DbStorage<Database, C>
1163where
1164 Database: KeyValueDatabase<
1165 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
1166 Error: Send + Sync,
1167 > + Clone
1168 + linera_base::util::traits::AutoTraits
1169 + 'static,
1170 C: Clock + Clone + Send + Sync + 'static,
1171{
1172 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
1173 type Clock = C;
1174 type BlockExporterContext = ViewContext<u32, Database::Store>;
1175
1176 fn clock(&self) -> &C {
1177 &self.clock
1178 }
1179
1180 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
1181 &self.thread_pool
1182 }
1183
1184 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
1185 async fn load_chain(
1186 &self,
1187 chain_id: ChainId,
1188 ) -> Result<ChainStateView<Self::Context>, ViewError> {
1189 #[cfg(with_metrics)]
1190 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
1191 let runtime_context = ChainRuntimeContext {
1192 storage: self.clone(),
1193 thread_pool: self.thread_pool.clone(),
1194 chain_id,
1195 execution_runtime_config: self.execution_runtime_config,
1196 user_contracts: self.user_contracts.clone(),
1197 user_services: self.user_services.clone(),
1198 };
1199 let root_key = RootKey::ChainState(chain_id).bytes();
1200 let store = self.database.open_exclusive(&root_key)?;
1201 let context = ViewContext::create_root_context(store, runtime_context).await?;
1202 ChainStateView::load(context).await
1203 }
1204
1205 #[instrument(level = "trace", skip_all, fields(%blob_id))]
1206 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1207 if self.caches.blob.contains(&blob_id) {
1208 #[cfg(with_metrics)]
1209 metrics::CONTAINS_BLOB_COUNTER
1210 .with_label_values(&[metrics::CACHE])
1211 .inc();
1212 return Ok(true);
1213 }
1214 let root_key = RootKey::Blob(blob_id).bytes();
1215 let store = self.database.open_shared(&root_key)?;
1216 let test = store.contains_key(BLOB_KEY).await?;
1217 #[cfg(with_metrics)]
1218 metrics::CONTAINS_BLOB_COUNTER
1219 .with_label_values(&[metrics::DB])
1220 .inc();
1221 Ok(test)
1222 }
1223
1224 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
1225 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
1226 let mut missing_blobs = Vec::new();
1227 #[cfg(with_metrics)]
1228 let mut cache_hits: u64 = 0;
1229 #[cfg(with_metrics)]
1230 let mut db_checks: u64 = 0;
1231 for blob_id in blob_ids {
1232 if self.caches.blob.contains(blob_id) {
1233 #[cfg(with_metrics)]
1234 {
1235 cache_hits += 1;
1236 }
1237 continue;
1238 }
1239 #[cfg(with_metrics)]
1240 {
1241 db_checks += 1;
1242 }
1243 let root_key = RootKey::Blob(*blob_id).bytes();
1244 let store = self.database.open_shared(&root_key)?;
1245 if !store.contains_key(BLOB_KEY).await? {
1246 missing_blobs.push(*blob_id);
1247 }
1248 }
1249 #[cfg(with_metrics)]
1250 {
1251 if cache_hits > 0 {
1252 metrics::CONTAINS_BLOBS_COUNTER
1253 .with_label_values(&[metrics::CACHE])
1254 .inc_by(cache_hits);
1255 }
1256 if db_checks > 0 {
1257 metrics::CONTAINS_BLOBS_COUNTER
1258 .with_label_values(&[metrics::DB])
1259 .inc_by(db_checks);
1260 }
1261 }
1262 Ok(missing_blobs)
1263 }
1264
1265 #[instrument(skip_all, fields(%blob_id))]
1266 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1267 let root_key = RootKey::Blob(blob_id).bytes();
1268 let store = self.database.open_shared(&root_key)?;
1269 let test = store.contains_key(BLOB_STATE_KEY).await?;
1270 #[cfg(with_metrics)]
1271 metrics::CONTAINS_BLOB_STATE_COUNTER
1272 .with_label_values(&[metrics::DB])
1273 .inc();
1274 Ok(test)
1275 }
1276
1277 #[instrument(skip_all, fields(%hash))]
1278 async fn read_confirmed_block(
1279 &self,
1280 hash: CryptoHash,
1281 ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError> {
1282 if let Some(block) = self.caches.confirmed_block.get(&hash) {
1283 #[cfg(with_metrics)]
1284 metrics::READ_CONFIRMED_BLOCK_COUNTER
1285 .with_label_values(&[metrics::CACHE])
1286 .inc();
1287 return Ok(Some(block));
1288 }
1289 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1290 let store = self.database.open_shared(&root_key)?;
1291 let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
1292 #[cfg(with_metrics)]
1293 metrics::READ_CONFIRMED_BLOCK_COUNTER
1294 .with_label_values(&[metrics::DB])
1295 .inc();
1296 match value {
1297 Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
1298 None => Ok(None),
1299 }
1300 }
1301
1302 #[instrument(skip_all)]
1303 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
1304 &self,
1305 hashes: I,
1306 ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError> {
1307 let hashes = hashes.into_iter().collect::<Vec<_>>();
1308 if hashes.is_empty() {
1309 return Ok(Vec::new());
1310 }
1311 let mut results = vec![None; hashes.len()];
1312 let mut misses = Vec::new();
1313 for (i, hash) in hashes.iter().enumerate() {
1314 if let Some(block) = self.caches.confirmed_block.get(hash) {
1315 results[i] = Some(block);
1316 } else {
1317 misses.push(i);
1318 }
1319 }
1320 if !misses.is_empty() {
1321 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1322 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1323 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1324 let store = self.database.open_shared(&root_key)?;
1325 if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
1326 results[*miss_idx] = Some(
1327 self.caches
1328 .confirmed_block
1329 .insert(&hashes[*miss_idx], block),
1330 );
1331 }
1332 }
1333 }
1334 #[cfg(with_metrics)]
1335 {
1336 let cache_hits = (hashes.len() - misses.len()) as u64;
1337 if cache_hits > 0 {
1338 metrics::READ_CONFIRMED_BLOCKS_COUNTER
1339 .with_label_values(&[metrics::CACHE])
1340 .inc_by(cache_hits);
1341 }
1342 let db_reads = misses.len() as u64;
1343 if db_reads > 0 {
1344 metrics::READ_CONFIRMED_BLOCKS_COUNTER
1345 .with_label_values(&[metrics::DB])
1346 .inc_by(db_reads);
1347 }
1348 }
1349 Ok(results)
1350 }
1351
1352 #[instrument(skip_all, fields(%blob_id))]
1353 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
1354 if let Some(blob) = self.caches.blob.get(&blob_id) {
1355 #[cfg(with_metrics)]
1356 metrics::READ_BLOB_COUNTER
1357 .with_label_values(&[metrics::CACHE])
1358 .inc();
1359 return Ok(Some(blob));
1360 }
1361 let root_key = RootKey::Blob(blob_id).bytes();
1362 let store = self.database.open_shared(&root_key)?;
1363 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
1364 #[cfg(with_metrics)]
1365 metrics::READ_BLOB_COUNTER
1366 .with_label_values(&[metrics::DB])
1367 .inc();
1368 match maybe_blob_bytes {
1369 Some(blob_bytes) => {
1370 let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
1371 Ok(Some(self.caches.blob.insert(&blob_id, blob)))
1372 }
1373 None => Ok(None),
1374 }
1375 }
1376
1377 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1378 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError> {
1379 if blob_ids.is_empty() {
1380 return Ok(Vec::new());
1381 }
1382 let mut blobs = Vec::new();
1383 for blob_id in blob_ids {
1384 blobs.push(self.read_blob(*blob_id).await?);
1385 }
1386 Ok(blobs)
1387 }
1388
1389 #[instrument(skip_all, fields(%blob_id))]
1390 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
1391 let root_key = RootKey::Blob(blob_id).bytes();
1392 let store = self.database.open_shared(&root_key)?;
1393 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1394 #[cfg(with_metrics)]
1395 metrics::READ_BLOB_STATE_COUNTER
1396 .with_label_values(&[metrics::DB])
1397 .inc();
1398 Ok(blob_state)
1399 }
1400
1401 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1402 async fn read_blob_states(
1403 &self,
1404 blob_ids: &[BlobId],
1405 ) -> Result<Vec<Option<BlobState>>, ViewError> {
1406 if blob_ids.is_empty() {
1407 return Ok(Vec::new());
1408 }
1409 let mut blob_states = Vec::new();
1410 for blob_id in blob_ids {
1411 blob_states.push(self.read_blob_state(*blob_id).await?);
1412 }
1413 Ok(blob_states)
1414 }
1415
1416 #[instrument(skip_all, fields(blob_id = %blob.id()))]
1417 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1418 let mut batch = MultiPartitionBatch::new();
1419 batch.add_blob(blob);
1420 self.write_batch(batch).await?;
1421 Ok(())
1422 }
1423
1424 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1425 async fn maybe_write_blob_states(
1426 &self,
1427 blob_ids: &[BlobId],
1428 blob_state: BlobState,
1429 ) -> Result<(), ViewError> {
1430 if blob_ids.is_empty() {
1431 return Ok(());
1432 }
1433 let mut maybe_blob_states = Vec::new();
1434 for blob_id in blob_ids {
1435 let root_key = RootKey::Blob(*blob_id).bytes();
1436 let store = self.database.open_shared(&root_key)?;
1437 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1438 maybe_blob_states.push(maybe_blob_state);
1439 }
1440 let mut batch = MultiPartitionBatch::new();
1441 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1442 match maybe_blob_state {
1443 None => {
1444 batch.add_blob_state(*blob_id, &blob_state)?;
1445 }
1446 Some(state) => {
1447 if state.epoch < blob_state.epoch {
1448 batch.add_blob_state(*blob_id, &blob_state)?;
1449 }
1450 }
1451 }
1452 }
1453 self.write_batch(batch).await?;
1457 Ok(())
1458 }
1459
1460 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1461 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1462 if blobs.is_empty() {
1463 return Ok(Vec::new());
1464 }
1465 let mut batch = MultiPartitionBatch::new();
1466 let mut blob_states = Vec::new();
1467 for blob in blobs {
1468 let root_key = RootKey::Blob(blob.id()).bytes();
1469 let store = self.database.open_shared(&root_key)?;
1470 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1471 blob_states.push(has_state);
1472 if has_state {
1473 batch.add_blob(blob);
1474 }
1475 }
1476 self.write_batch(batch).await?;
1477 Ok(blob_states)
1478 }
1479
1480 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1481 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1482 if blobs.is_empty() {
1483 return Ok(());
1484 }
1485 let mut batch = MultiPartitionBatch::new();
1486 for blob in blobs {
1487 batch.add_blob(blob);
1488 }
1489 self.write_batch(batch).await
1490 }
1491
1492 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1493 async fn write_blobs_and_certificate(
1494 &self,
1495 blobs: &[Blob],
1496 certificate: &ConfirmedBlockCertificate,
1497 ) -> Result<(), ViewError> {
1498 let mut batch = MultiPartitionBatch::new();
1499 for blob in blobs {
1500 batch.add_blob(blob);
1501 }
1502 batch.add_certificate(certificate)?;
1503 self.write_batch(batch).await
1504 }
1505
1506 #[instrument(skip_all, fields(%hash))]
1507 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1508 if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1509 #[cfg(with_metrics)]
1510 metrics::CONTAINS_CERTIFICATE_COUNTER
1511 .with_label_values(&[metrics::CACHE])
1512 .inc();
1513 return Ok(true);
1514 }
1515 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1516 let store = self.database.open_shared(&root_key)?;
1517 let results = store.contains_keys(&get_block_keys()).await?;
1518 #[cfg(with_metrics)]
1519 metrics::CONTAINS_CERTIFICATE_COUNTER
1520 .with_label_values(&[metrics::DB])
1521 .inc();
1522 Ok(results[0] && results[1])
1523 }
1524
1525 #[instrument(skip_all, fields(%hash))]
1526 async fn read_certificate(
1527 &self,
1528 hash: CryptoHash,
1529 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1530 if let Some(cert) = self.caches.certificate.get(&hash) {
1532 #[cfg(with_metrics)]
1533 metrics::READ_CERTIFICATE_COUNTER
1534 .with_label_values(&[metrics::CACHE])
1535 .inc();
1536 return Ok(Some(cert));
1537 }
1538 if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1540 #[cfg(with_metrics)]
1541 metrics::READ_CERTIFICATE_COUNTER
1542 .with_label_values(&[metrics::CACHE])
1543 .inc();
1544 return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1545 }
1546 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1548 let store = self.database.open_shared(&root_key)?;
1549 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1550 #[cfg(with_metrics)]
1551 metrics::READ_CERTIFICATE_COUNTER
1552 .with_label_values(&[metrics::DB])
1553 .inc();
1554 let Some(lite_cert_bytes) = values[0].as_ref() else {
1555 return Ok(None);
1556 };
1557 let Some(confirmed_block_bytes) = values[1].as_ref() else {
1558 return Ok(None);
1559 };
1560 self.caches.certificate_raw.insert(
1561 &hash,
1562 (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1563 );
1564 self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1565 }
1566
1567 #[instrument(skip_all)]
1568 async fn read_certificates(
1569 &self,
1570 hashes: &[CryptoHash],
1571 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1572 let raw_certs = self.read_certificates_raw(hashes).await?;
1573
1574 raw_certs
1575 .into_iter()
1576 .map(|maybe_raw| {
1577 let Some(raw) = maybe_raw else {
1578 return Ok(None);
1579 };
1580 self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1581 })
1582 .collect()
1583 }
1584
1585 #[instrument(skip_all)]
1586 async fn read_certificates_raw(
1587 &self,
1588 hashes: &[CryptoHash],
1589 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1590 if hashes.is_empty() {
1591 return Ok(Vec::new());
1592 }
1593 let mut results = vec![None; hashes.len()];
1594 let mut misses = Vec::new();
1595 for (i, hash) in hashes.iter().enumerate() {
1596 if let Some(raw) = self.caches.certificate_raw.get(hash) {
1597 results[i] = Some(raw);
1598 } else {
1599 misses.push(i);
1600 }
1601 }
1602 if !misses.is_empty() {
1603 let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1604 let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1605 for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1606 let store = self.database.open_shared(&root_key)?;
1607 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1608 if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1609 results[*miss_idx] = Some(
1610 self.caches
1611 .certificate_raw
1612 .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1613 );
1614 }
1615 }
1616 }
1617 #[cfg(with_metrics)]
1618 {
1619 let cache_hits = (hashes.len() - misses.len()) as u64;
1620 if cache_hits > 0 {
1621 metrics::READ_CERTIFICATES_COUNTER
1622 .with_label_values(&[metrics::CACHE])
1623 .inc_by(cache_hits);
1624 }
1625 let db_reads = misses.len() as u64;
1626 if db_reads > 0 {
1627 metrics::READ_CERTIFICATES_COUNTER
1628 .with_label_values(&[metrics::DB])
1629 .inc_by(db_reads);
1630 }
1631 }
1632 Ok(results)
1633 }
1634
1635 async fn read_certificate_hashes_by_heights(
1636 &self,
1637 chain_id: ChainId,
1638 heights: &[BlockHeight],
1639 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1640 if heights.is_empty() {
1641 return Ok(Vec::new());
1642 }
1643
1644 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1645 let store = self.database.open_shared(&index_root_key)?;
1646 let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1647 let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1648 let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1649 .into_iter()
1650 .map(|opt| {
1651 opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1652 .transpose()
1653 })
1654 .collect::<Result<_, _>>()?;
1655
1656 Ok(hash_options)
1657 }
1658
1659 #[instrument(skip_all)]
1660 async fn read_certificates_by_heights_raw(
1661 &self,
1662 chain_id: ChainId,
1663 heights: &[BlockHeight],
1664 ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1665 let hashes: Vec<Option<CryptoHash>> = self
1666 .read_certificate_hashes_by_heights(chain_id, heights)
1667 .await?;
1668
1669 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1671 for (index, maybe_hash) in hashes.iter().enumerate() {
1672 if let Some(hash) = maybe_hash {
1673 indices.entry(*hash).or_default().push(index);
1674 }
1675 }
1676
1677 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1679
1680 let mut result = vec![None; heights.len()];
1681
1682 for (raw_cert, hash) in self
1683 .read_certificates_raw(&unique_hashes)
1684 .await?
1685 .into_iter()
1686 .zip(unique_hashes)
1687 {
1688 if let Some(idx_list) = indices.get(&hash) {
1689 for &index in idx_list {
1690 result[index] = raw_cert.clone();
1691 }
1692 } else {
1693 tracing::warn!(
1695 hash=?hash,
1696 "certificate hash not found in indices map",
1697 );
1698 }
1699 }
1700
1701 Ok(result)
1702 }
1703
1704 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1705 async fn read_certificates_by_heights(
1706 &self,
1707 chain_id: ChainId,
1708 heights: &[BlockHeight],
1709 ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1710 self.read_certificates_by_heights_raw(chain_id, heights)
1711 .await?
1712 .into_iter()
1713 .map(|maybe_raw| match maybe_raw {
1714 None => Ok(None),
1715 Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1716 })
1717 .collect()
1718 }
1719
1720 #[instrument(skip_all, fields(%chain_id, indices_len = indices.len()))]
1721 async fn write_certificate_height_indices(
1722 &self,
1723 chain_id: ChainId,
1724 indices: &[(BlockHeight, CryptoHash)],
1725 ) -> Result<(), ViewError> {
1726 if indices.is_empty() {
1727 return Ok(());
1728 }
1729
1730 let mut batch = MultiPartitionBatch::new();
1731 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1732 let key_values: Vec<(Vec<u8>, Vec<u8>)> = indices
1733 .iter()
1734 .map(|(height, hash)| {
1735 let height_key = to_height_key(*height);
1736 let hash_value = bcs::to_bytes(hash).unwrap();
1737 (height_key, hash_value)
1738 })
1739 .collect();
1740 batch.put_key_values(index_root_key, key_values);
1741 self.write_batch(batch).await
1742 }
1743
1744 #[instrument(skip_all, fields(event_id = ?event_id))]
1745 async fn read_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
1746 if let Some(event) = self.caches.event.get(&event_id) {
1747 #[cfg(with_metrics)]
1748 metrics::READ_EVENT_COUNTER
1749 .with_label_values(&[metrics::CACHE])
1750 .inc();
1751 return Ok(Some(event));
1752 }
1753 let event_key = to_event_key(&event_id);
1754 let root_key = RootKey::Event(event_id.chain_id).bytes();
1755 let store = self.database.open_shared(&root_key)?;
1756 let event = store.read_value_bytes(&event_key).await?;
1757 #[cfg(with_metrics)]
1758 metrics::READ_EVENT_COUNTER
1759 .with_label_values(&[metrics::DB])
1760 .inc();
1761 match event {
1762 Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1763 None => Ok(None),
1764 }
1765 }
1766
1767 #[instrument(skip_all, fields(event_id = ?event_id))]
1768 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1769 if self.caches.event.contains(&event_id) {
1770 #[cfg(with_metrics)]
1771 metrics::CONTAINS_EVENT_COUNTER
1772 .with_label_values(&[metrics::CACHE])
1773 .inc();
1774 return Ok(true);
1775 }
1776 let event_key = to_event_key(&event_id);
1777 let root_key = RootKey::Event(event_id.chain_id).bytes();
1778 let store = self.database.open_shared(&root_key)?;
1779 let exists = store.contains_key(&event_key).await?;
1780 #[cfg(with_metrics)]
1781 metrics::CONTAINS_EVENT_COUNTER
1782 .with_label_values(&[metrics::DB])
1783 .inc();
1784 Ok(exists)
1785 }
1786
1787 #[instrument(skip_all, fields(%chain_id, %stream_id, %start_index))]
1788 async fn read_events_from_index(
1789 &self,
1790 chain_id: &ChainId,
1791 stream_id: &StreamId,
1792 start_index: u32,
1793 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1794 let root_key = RootKey::Event(*chain_id).bytes();
1795 let store = self.database.open_shared(&root_key)?;
1796 let mut keys = Vec::new();
1797 let mut indices = Vec::new();
1798 let prefix = bcs::to_bytes(stream_id).unwrap();
1799 for short_key in store.find_keys_by_prefix(&prefix).await? {
1800 let index = bcs::from_bytes::<u32>(&short_key)?;
1801 if index >= start_index {
1802 let mut key = prefix.clone();
1803 key.extend(short_key);
1804 keys.push(key);
1805 indices.push(index);
1806 }
1807 }
1808 let values = store.read_multi_values_bytes(&keys).await?;
1809 let mut returned_values = Vec::new();
1810 for (index, value) in indices.into_iter().zip(values) {
1811 let event = value.unwrap();
1812 returned_values.push(IndexAndEvent { index, event });
1813 }
1814 Ok(returned_values)
1815 }
1816
1817 #[instrument(skip_all)]
1818 async fn write_events(
1819 &self,
1820 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1821 ) -> Result<(), ViewError> {
1822 let mut batch = MultiPartitionBatch::new();
1823 for (event_id, value) in events {
1824 batch.add_event(&event_id, value);
1825 }
1826 self.write_batch(batch).await
1827 }
1828
1829 #[instrument(skip_all)]
1830 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1831 let root_key = RootKey::NetworkDescription.bytes();
1832 let store = self.database.open_shared(&root_key)?;
1833 let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1834 #[cfg(with_metrics)]
1835 metrics::READ_NETWORK_DESCRIPTION
1836 .with_label_values(&[metrics::DB])
1837 .inc();
1838 Ok(maybe_value)
1839 }
1840
1841 #[instrument(skip_all)]
1842 async fn write_network_description(
1843 &self,
1844 information: &NetworkDescription,
1845 ) -> Result<(), ViewError> {
1846 let mut batch = MultiPartitionBatch::new();
1847 batch.add_network_description(information)?;
1848 self.write_batch(batch).await?;
1849 Ok(())
1850 }
1851
1852 fn shared_committees(&self) -> &SharedCommittees {
1853 &self.shared_committees
1854 }
1855
1856 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1857 self.wasm_runtime
1858 }
1859
1860 #[instrument(skip_all)]
1861 async fn block_exporter_context(
1862 &self,
1863 block_exporter_id: u32,
1864 ) -> Result<Self::BlockExporterContext, ViewError> {
1865 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1866 let store = self.database.open_exclusive(&root_key)?;
1867 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1868 }
1869}
1870
1871impl<Database, C> DbStorage<Database, C>
1872where
1873 Database: KeyValueDatabase + Clone,
1874 Database::Store: KeyValueStore + Clone,
1875 C: Clock,
1876 Database::Error: Send + Sync,
1877{
1878 #[instrument(skip_all)]
1879 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1880 hashes
1881 .iter()
1882 .map(|hash| RootKey::ConfirmedBlock(*hash).bytes())
1883 .collect()
1884 }
1885
1886 fn deserialize_and_cache_certificate(
1887 &self,
1888 lite_cert_bytes: &[u8],
1889 confirmed_block_bytes: &[u8],
1890 ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1891 let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1892 let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1893 let hash = block.hash();
1894 self.caches.confirmed_block.insert(&hash, block.clone());
1895 let certificate = lite
1896 .with_value(block)
1897 .ok_or(ViewError::InconsistentEntries)?;
1898 let arc = self.caches.certificate.insert(&hash, certificate);
1899 Ok(Some(arc))
1900 }
1901
1902 #[instrument(skip_all)]
1903 async fn write_entry(
1904 store: &Database::Store,
1905 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1906 ) -> Result<(), ViewError> {
1907 let mut batch = Batch::new();
1908 for (key, value) in key_values {
1909 batch.put_key_value_bytes(key, value);
1910 }
1911 store.write_batch(batch).await?;
1912 Ok(())
1913 }
1914
1915 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1916 pub(crate) async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1917 if batch.keys_value_bytes.is_empty() {
1918 return Ok(());
1919 }
1920 let mut futures = Vec::new();
1921 for (root_key, key_values) in batch.keys_value_bytes {
1922 let store = self.database.open_shared(&root_key)?;
1923 futures.push(async move { Self::write_entry(&store, key_values).await });
1924 }
1925 futures::future::try_join_all(futures).await?;
1926 Ok(())
1927 }
1928}
1929
1930impl<Database, C> DbStorage<Database, C>
1931where
1932 Database: KeyValueDatabase + Clone + 'static,
1933 Database::Error: Send + Sync,
1934 Database::Store: KeyValueStore + Clone + 'static,
1935 C: Clock + Clone + Send + Sync + 'static,
1936{
1937 pub(crate) fn new(
1938 database: Database,
1939 wasm_runtime: Option<WasmRuntime>,
1940 cache_sizes: StorageCacheConfig,
1941 clock: C,
1942 ) -> Self {
1943 Self {
1944 database: Arc::new(database),
1945 clock,
1946 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1948 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1949 wasm_runtime,
1950 user_contracts: Arc::new(papaya::HashMap::new()),
1951 user_services: Arc::new(papaya::HashMap::new()),
1952 shared_committees: SharedCommittees::new(),
1953 caches: StorageCaches::new(cache_sizes),
1954 execution_runtime_config: ExecutionRuntimeConfig::default(),
1955 }
1956 }
1957
1958 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1960 self.execution_runtime_config.allow_application_logs = allow;
1961 self
1962 }
1963}
1964
1965impl<Database> DbStorage<Database, WallClock>
1966where
1967 Database: KeyValueDatabase + Clone + 'static,
1968 Database::Error: Send + Sync,
1969 Database::Store: KeyValueStore + Clone + 'static,
1970{
1971 pub async fn maybe_create_and_connect(
1972 config: &Database::Config,
1973 namespace: &str,
1974 wasm_runtime: Option<WasmRuntime>,
1975 cache_sizes: StorageCacheConfig,
1976 ) -> Result<Self, ViewError> {
1977 let database = Database::maybe_create_and_connect(config, namespace).await?;
1978 let storage = Self::new(database, wasm_runtime, cache_sizes, WallClock);
1979 Ok(storage)
1980 }
1981
1982 pub async fn connect(
1983 config: &Database::Config,
1984 namespace: &str,
1985 wasm_runtime: Option<WasmRuntime>,
1986 cache_sizes: StorageCacheConfig,
1987 ) -> Result<Self, ViewError> {
1988 let database = Database::connect(config, namespace).await?;
1989 let storage = Self::new(database, wasm_runtime, cache_sizes, WallClock);
1990 Ok(storage)
1991 }
1992
1993 pub async fn list_blob_ids(
1995 config: &Database::Config,
1996 namespace: &str,
1997 ) -> Result<Vec<BlobId>, ViewError> {
1998 let database = Database::connect(config, namespace).await?;
1999 let root_keys = database.list_root_keys().await?;
2000 let mut blob_ids = Vec::new();
2001 for root_key in root_keys {
2002 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
2003 let root_key_red = &root_key[1..];
2004 let blob_id = bcs::from_bytes(root_key_red)?;
2005 blob_ids.push(blob_id);
2006 }
2007 }
2008 Ok(blob_ids)
2009 }
2010}
2011
2012impl<Database> DbStorage<Database, WallClock>
2013where
2014 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
2015 Database::Error: Send + Sync,
2016{
2017 pub async fn list_chain_ids(
2019 config: &Database::Config,
2020 namespace: &str,
2021 ) -> Result<Vec<ChainId>, ViewError> {
2022 let database = Database::connect(config, namespace).await?;
2023 let root_keys = database.list_root_keys().await?;
2024 let mut chain_ids = Vec::new();
2025 for root_key in root_keys {
2026 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
2027 let root_key_red = &root_key[1..];
2028 let chain_id = bcs::from_bytes(root_key_red)?;
2029 chain_ids.push(chain_id);
2030 }
2031 }
2032 Ok(chain_ids)
2033 }
2034}
2035
2036#[cfg(with_testing)]
2037impl<Database> DbStorage<Database, TestClock>
2038where
2039 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
2040 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
2041 Database::Error: Send + Sync,
2042{
2043 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
2044 let config = Database::new_test_config().await.unwrap();
2045 let namespace = generate_test_namespace();
2046 DbStorage::<Database, TestClock>::new_for_testing(
2047 config,
2048 &namespace,
2049 wasm_runtime,
2050 TestClock::new(),
2051 )
2052 .await
2053 .unwrap()
2054 }
2055
2056 pub async fn new_for_testing(
2057 config: Database::Config,
2058 namespace: &str,
2059 wasm_runtime: Option<WasmRuntime>,
2060 clock: TestClock,
2061 ) -> Result<Self, ViewError> {
2062 let database = Database::recreate_and_connect(&config, namespace).await?;
2063 let storage = Self::new(database, wasm_runtime, DEFAULT_STORAGE_CACHE_CONFIG, clock);
2064 storage.assert_is_migrated_storage().await?;
2065 Ok(storage)
2066 }
2067}