Skip to main content

linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashMap, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10    crypto::CryptoHash,
11    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
12    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_cache::ValueCache;
15use linera_chain::{
16    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
17    ChainStateView,
18};
19use linera_execution::{
20    BlobState, ExecutionRuntimeConfig, SharedCommittees, UserContractCode, UserServiceCode,
21    WasmRuntime,
22};
23use linera_views::{
24    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
25    batch::Batch,
26    context::ViewContext,
27    store::{
28        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
29    },
30    views::View,
31    ViewError,
32};
33use serde::{Deserialize, Serialize};
34use tracing::instrument;
35#[cfg(with_testing)]
36use {
37    futures::channel::oneshot::{self, Receiver},
38    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
39    std::{cmp::Reverse, collections::BTreeMap},
40};
41
42use crate::{ChainRuntimeContext, Clock, Storage};
43
44#[cfg(with_metrics)]
45pub mod metrics {
46    use std::sync::LazyLock;
47
48    use linera_base::prometheus_util::{
49        exponential_bucket_latencies, register_histogram_vec, register_int_counter,
50        register_int_counter_vec,
51    };
52    use prometheus::{HistogramVec, IntCounter, IntCounterVec};
53
54    /// Label name for distinguishing cache hits vs DB reads.
55    pub(super) const SOURCE_LABEL: &str = "source";
56    /// Label value for items served from the in-memory cache.
57    pub(super) const CACHE: &str = "cache";
58    /// Label value for items served from the database.
59    pub(super) const DB: &str = "db";
60
61    /// The metric counting how often a blob is tested for existence from storage
62    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
63        register_int_counter_vec(
64            "contains_blob",
65            "The metric counting how often a blob is tested for existence from storage",
66            &[SOURCE_LABEL],
67        )
68    });
69
70    /// The metric counting how often multiple blobs are tested for existence from storage
71    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
72        register_int_counter_vec(
73            "contains_blobs",
74            "The metric counting how often multiple blobs are tested for existence from storage",
75            &[SOURCE_LABEL],
76        )
77    });
78
79    /// The metric counting how often a blob state is tested for existence from storage
80    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
81        register_int_counter_vec(
82            "contains_blob_state",
83            "The metric counting how often a blob state is tested for existence from storage",
84            &[SOURCE_LABEL],
85        )
86    });
87
88    /// The metric counting how often a certificate is tested for existence from storage.
89    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90        register_int_counter_vec(
91            "contains_certificate",
92            "The metric counting how often a certificate is tested for existence from storage",
93            &[SOURCE_LABEL],
94        )
95    });
96
97    /// The metric counting how often a hashed certificate value is read from storage.
98    #[doc(hidden)]
99    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
100        register_int_counter_vec(
101            "read_confirmed_block",
102            "The metric counting how often a hashed confirmed block is read from storage",
103            &[SOURCE_LABEL],
104        )
105    });
106
107    /// The metric counting how often confirmed blocks are read from storage.
108    #[doc(hidden)]
109    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
110        LazyLock::new(|| {
111            register_int_counter_vec(
112                "read_confirmed_blocks",
113                "The metric counting how often confirmed blocks are read from storage",
114                &[SOURCE_LABEL],
115            )
116        });
117
118    /// The metric counting how often a blob is read from storage.
119    #[doc(hidden)]
120    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
121        register_int_counter_vec(
122            "read_blob",
123            "The metric counting how often a blob is read from storage",
124            &[SOURCE_LABEL],
125        )
126    });
127
128    /// The metric counting how often a blob state is read from storage.
129    #[doc(hidden)]
130    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
131        register_int_counter_vec(
132            "read_blob_state",
133            "The metric counting how often a blob state is read from storage",
134            &[SOURCE_LABEL],
135        )
136    });
137
138    /// The metric counting how often a blob is written to storage.
139    #[doc(hidden)]
140    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
141        register_int_counter(
142            "write_blob",
143            "The metric counting how often a blob is written to storage",
144        )
145    });
146
147    /// The metric counting how often a certificate is read from storage.
148    #[doc(hidden)]
149    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
150        register_int_counter_vec(
151            "read_certificate",
152            "The metric counting how often a certificate is read from storage",
153            &[SOURCE_LABEL],
154        )
155    });
156
157    /// The metric counting how often certificates are read from storage.
158    #[doc(hidden)]
159    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
160        register_int_counter_vec(
161            "read_certificates",
162            "The metric counting how often certificate are read from storage",
163            &[SOURCE_LABEL],
164        )
165    });
166
167    /// The metric counting how often a certificate is written to storage.
168    #[doc(hidden)]
169    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
170        register_int_counter(
171            "write_certificate",
172            "The metric counting how often a certificate is written to storage",
173        )
174    });
175
176    /// The latency to load a chain state.
177    #[doc(hidden)]
178    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
179        register_histogram_vec(
180            "load_chain_latency",
181            "The latency to load a chain state",
182            &[],
183            exponential_bucket_latencies(1000.0),
184        )
185    });
186
187    /// The metric counting how often an event is read from storage.
188    #[doc(hidden)]
189    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
190        register_int_counter_vec(
191            "read_event",
192            "The metric counting how often an event is read from storage",
193            &[SOURCE_LABEL],
194        )
195    });
196
197    /// The metric counting how often an event is tested for existence from storage
198    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
199        register_int_counter_vec(
200            "contains_event",
201            "The metric counting how often an event is tested for existence from storage",
202            &[SOURCE_LABEL],
203        )
204    });
205
206    /// The metric counting how often an event is written to storage.
207    #[doc(hidden)]
208    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounter> = LazyLock::new(|| {
209        register_int_counter(
210            "write_event",
211            "The metric counting how often an event is written to storage",
212        )
213    });
214
215    /// The metric counting how often the network description is read from storage.
216    #[doc(hidden)]
217    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
218        register_int_counter_vec(
219            "network_description",
220            "The metric counting how often the network description is read from storage",
221            &[SOURCE_LABEL],
222        )
223    });
224
225    /// The metric counting how often the network description is written to storage.
226    #[doc(hidden)]
227    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounter> = LazyLock::new(|| {
228        register_int_counter(
229            "write_network_description",
230            "The metric counting how often the network description is written to storage",
231        )
232    });
233}
234
235/// The key used for blobs. The Blob ID itself is contained in the root key.
236pub(crate) const BLOB_KEY: &[u8] = &[42];
237
238/// The key used for blob states. The Blob ID itself is contained in the root key.
239pub(crate) const BLOB_STATE_KEY: &[u8] = &[49];
240
241/// The key used for lite certificates. The cryptohash itself is contained in the root key.
242pub(crate) const LITE_CERTIFICATE_KEY: &[u8] = &[91];
243
244/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
245pub(crate) const BLOCK_KEY: &[u8] = &[221];
246
247/// The key used for the network description.
248pub(crate) const NETWORK_DESCRIPTION_KEY: &[u8] = &[119];
249
250fn get_block_keys() -> Vec<Vec<u8>> {
251    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
252}
253
254#[derive(Default)]
255#[expect(clippy::type_complexity)]
256pub(crate) struct MultiPartitionBatch {
257    keys_value_bytes: Vec<(Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>)>,
258}
259
260impl MultiPartitionBatch {
261    pub(crate) fn new() -> Self {
262        Self::default()
263    }
264
265    pub(crate) fn put_key_values(
266        &mut self,
267        root_key: Vec<u8>,
268        key_values: Vec<(Vec<u8>, Vec<u8>)>,
269    ) {
270        self.keys_value_bytes.push((root_key, key_values));
271    }
272
273    pub(crate) fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
274        self.put_key_values(root_key, vec![(key, value)]);
275    }
276
277    fn add_blob(&mut self, blob: &Blob) {
278        #[cfg(with_metrics)]
279        metrics::WRITE_BLOB_COUNTER.inc();
280        let root_key = RootKey::Blob(blob.id()).bytes();
281        let key = BLOB_KEY.to_vec();
282        self.put_key_value(root_key, key, blob.bytes().to_vec());
283    }
284
285    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
286        let root_key = RootKey::Blob(blob_id).bytes();
287        let key = BLOB_STATE_KEY.to_vec();
288        let value = bcs::to_bytes(blob_state)?;
289        self.put_key_value(root_key, key, value);
290        Ok(())
291    }
292
293    /// Adds a certificate to the batch.
294    ///
295    /// Writes both the certificate data (indexed by hash) and a height index
296    /// (mapping chain_id + height to hash).
297    ///
298    /// Note: If called multiple times with the same `(chain_id, height)`, the height
299    /// index will be overwritten. The caller is responsible for ensuring that
300    /// certificates at the same height have the same hash.
301    fn add_certificate(
302        &mut self,
303        certificate: &ConfirmedBlockCertificate,
304    ) -> Result<(), ViewError> {
305        #[cfg(with_metrics)]
306        metrics::WRITE_CERTIFICATE_COUNTER.inc();
307        let hash = certificate.hash();
308
309        // Write certificate data by hash
310        let root_key = RootKey::ConfirmedBlock(hash).bytes();
311        let mut key_values = Vec::new();
312        let key = LITE_CERTIFICATE_KEY.to_vec();
313        let value = bcs::to_bytes(&certificate.lite_certificate())?;
314        key_values.push((key, value));
315        let key = BLOCK_KEY.to_vec();
316        let value = bcs::to_bytes(&certificate.value())?;
317        key_values.push((key, value));
318        self.put_key_values(root_key, key_values);
319
320        // Write height index: chain_id -> height -> hash
321        let chain_id = certificate.value().block().header.chain_id;
322        let height = certificate.value().block().header.height;
323        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
324        let height_key = to_height_key(height);
325        let index_value = bcs::to_bytes(&hash)?;
326        self.put_key_value(index_root_key, height_key, index_value);
327
328        Ok(())
329    }
330
331    fn add_event(&mut self, event_id: &EventId, value: Vec<u8>) {
332        #[cfg(with_metrics)]
333        metrics::WRITE_EVENT_COUNTER.inc();
334        let key = to_event_key(event_id);
335        let root_key = RootKey::Event(event_id.chain_id).bytes();
336        self.put_key_value(root_key, key, value);
337    }
338
339    fn add_network_description(
340        &mut self,
341        information: &NetworkDescription,
342    ) -> Result<(), ViewError> {
343        #[cfg(with_metrics)]
344        metrics::WRITE_NETWORK_DESCRIPTION.inc();
345        let root_key = RootKey::NetworkDescription.bytes();
346        let key = NETWORK_DESCRIPTION_KEY.to_vec();
347        let value = bcs::to_bytes(information)?;
348        self.put_key_value(root_key, key, value);
349        Ok(())
350    }
351}
352
353/// Individual cache sizes for each `ValueCache` in `DbStorage`.
354#[derive(Clone, Copy, Debug)]
355pub struct StorageCacheConfig {
356    pub blob_cache_size: usize,
357    pub confirmed_block_cache_size: usize,
358    pub certificate_cache_size: usize,
359    pub certificate_raw_cache_size: usize,
360    pub event_cache_size: usize,
361    pub cache_cleanup_interval_secs: u64,
362}
363
364/// Default cache configuration for testing.
365#[cfg(with_testing)]
366pub const DEFAULT_STORAGE_CACHE_CONFIG: StorageCacheConfig = StorageCacheConfig {
367    blob_cache_size: 1000,
368    confirmed_block_cache_size: 1000,
369    certificate_cache_size: 1000,
370    certificate_raw_cache_size: 1000,
371    event_cache_size: 1000,
372    cache_cleanup_interval_secs: linera_cache::DEFAULT_CLEANUP_INTERVAL_SECS,
373};
374
375/// Raw certificate bytes: (lite_certificate_bytes, confirmed_block_bytes).
376type RawCertificate = (Vec<u8>, Vec<u8>);
377
378/// Groups all `ValueCache` instances used by `DbStorage`.
379///
380/// All caches use `ValueCache` which stores values as `Arc<V>` internally,
381/// ensuring memory-efficient sharing across consumers. Adding a new cache
382/// here automatically inherits Arc-based sharing.
383#[derive(Clone)]
384pub struct StorageCaches {
385    pub(crate) blob: Arc<ValueCache<BlobId, Blob>>,
386    pub(crate) confirmed_block: Arc<ValueCache<CryptoHash, ConfirmedBlock>>,
387    pub(crate) certificate: Arc<ValueCache<CryptoHash, ConfirmedBlockCertificate>>,
388    pub(crate) certificate_raw: Arc<ValueCache<CryptoHash, RawCertificate>>,
389    pub(crate) event: Arc<ValueCache<EventId, Vec<u8>>>,
390}
391
392impl StorageCaches {
393    /// Creates all caches with the given sizes.
394    pub fn new(sizes: StorageCacheConfig) -> Self {
395        let interval = sizes.cache_cleanup_interval_secs;
396        Self {
397            blob: Arc::new(ValueCache::new(sizes.blob_cache_size, interval)),
398            confirmed_block: Arc::new(ValueCache::new(sizes.confirmed_block_cache_size, interval)),
399            certificate: Arc::new(ValueCache::new(sizes.certificate_cache_size, interval)),
400            certificate_raw: Arc::new(ValueCache::new(sizes.certificate_raw_cache_size, interval)),
401            event: Arc::new(ValueCache::new(sizes.event_cache_size, interval)),
402        }
403    }
404}
405
406/// Main implementation of the [`Storage`] trait.
407#[derive(Clone)]
408pub struct DbStorage<Database, Clock = WallClock> {
409    pub(crate) database: Arc<Database>,
410    clock: Clock,
411    thread_pool: Arc<linera_execution::ThreadPool>,
412    wasm_runtime: Option<WasmRuntime>,
413    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
414    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
415    shared_committees: SharedCommittees,
416    caches: StorageCaches,
417    execution_runtime_config: ExecutionRuntimeConfig,
418}
419
420#[derive(Debug, Serialize, Deserialize)]
421pub enum RootKey {
422    ChainState(ChainId),
423    ConfirmedBlock(CryptoHash),
424    Blob(BlobId),
425    Event(ChainId),
426    Placeholder,
427    NetworkDescription,
428    BlockExporterState(u32),
429    BlockByHeight(ChainId),
430}
431
432const CHAIN_ID_TAG: u8 = 0;
433const BLOB_ID_TAG: u8 = 2;
434
435impl RootKey {
436    pub fn bytes(&self) -> Vec<u8> {
437        bcs::to_bytes(self).unwrap()
438    }
439}
440
441#[derive(Debug, Serialize, Deserialize)]
442pub(crate) struct RestrictedEventId {
443    pub stream_id: StreamId,
444    pub index: u32,
445}
446
447pub(crate) fn to_event_key(event_id: &EventId) -> Vec<u8> {
448    let restricted_event_id = RestrictedEventId {
449        stream_id: event_id.stream_id.clone(),
450        index: event_id.index,
451    };
452    bcs::to_bytes(&restricted_event_id).unwrap()
453}
454
455pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
456    bcs::to_bytes(&height).unwrap()
457}
458
459fn is_chain_state(root_key: &[u8]) -> bool {
460    if root_key.is_empty() {
461        return false;
462    }
463    root_key[0] == CHAIN_ID_TAG
464}
465
466#[cfg(test)]
467mod tests {
468    use linera_base::{
469        crypto::{CryptoHash, TestString},
470        data_types::{BlockHeight, Epoch, Round, Timestamp},
471        identifiers::{
472            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
473            StreamName,
474        },
475    };
476    use linera_chain::{
477        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
478        types::ConfirmedBlockCertificate,
479    };
480    use linera_views::{
481        memory::MemoryDatabase,
482        store::{KeyValueDatabase, ReadableKeyValueStore as _},
483    };
484
485    use crate::{
486        db_storage::{
487            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
488        },
489        DbStorage, Storage, TestClock,
490    };
491
492    // Several functionalities of the storage rely on the way that the serialization
493    // is done. Thus we need to check that the serialization works in the way that
494    // we expect.
495
496    // The listing of the blobs in `list_blob_ids` depends on the serialization
497    // of `RootKey::Blob`.
498    #[test]
499    fn test_root_key_blob_serialization() {
500        let hash = CryptoHash::default();
501        let blob_type = BlobType::default();
502        let blob_id = BlobId::new(hash, blob_type);
503        let root_key = RootKey::Blob(blob_id).bytes();
504        assert_eq!(root_key[0], BLOB_ID_TAG);
505        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
506    }
507
508    // The listing of the chains in `list_chain_ids` depends on the serialization
509    // of `RootKey::ChainState`.
510    #[test]
511    fn test_root_key_chainstate_serialization() {
512        let hash = CryptoHash::default();
513        let chain_id = ChainId(hash);
514        let root_key = RootKey::ChainState(chain_id).bytes();
515        assert_eq!(root_key[0], CHAIN_ID_TAG);
516        assert_eq!(
517            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
518            chain_id
519        );
520    }
521
522    // The listing of the events in `read_events_from_index` depends on the
523    // serialization of `RootKey::Event`.
524    #[test]
525    fn test_root_key_event_serialization() {
526        let hash = CryptoHash::test_hash("49");
527        let chain_id = ChainId(hash);
528        let application_description_hash = CryptoHash::test_hash("42");
529        let application_id = ApplicationId::new(application_description_hash);
530        let application_id = GenericApplicationId::User(application_id);
531        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
532        let stream_id = StreamId {
533            application_id,
534            stream_name,
535        };
536        let prefix = bcs::to_bytes(&stream_id).unwrap();
537
538        let index = 1567;
539        let event_id = EventId {
540            chain_id,
541            stream_id,
542            index,
543        };
544        let key = to_event_key(&event_id);
545        assert!(key.starts_with(&prefix));
546    }
547
548    // The height index lookup depends on the serialization of RootKey::BlockByHeight
549    // and to_height_key, following the same pattern as Event.
550    #[test]
551    fn test_root_key_block_by_height_serialization() {
552        use linera_base::data_types::BlockHeight;
553
554        let hash = CryptoHash::default();
555        let chain_id = ChainId(hash);
556        let height = BlockHeight(42);
557
558        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
559        let root_key = RootKey::BlockByHeight(chain_id).bytes();
560        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
561        assert_eq!(deserialized_chain_id, chain_id);
562
563        // Height is encoded as a key (like index in Event)
564        let height_key = to_height_key(height);
565        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
566        assert_eq!(deserialized_height, height);
567    }
568
569    #[cfg(with_testing)]
570    #[tokio::test]
571    async fn test_add_certificate_creates_height_index() {
572        // Create test storage
573        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
574
575        // Create a test certificate at a specific height
576        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
577        let height = BlockHeight(5);
578        let block = Block {
579            header: BlockHeader {
580                chain_id,
581                epoch: Epoch::ZERO,
582                height,
583                timestamp: Timestamp::from(0),
584                state_hash: CryptoHash::new(&TestString::new("state_hash")),
585                previous_block_hash: None,
586                authenticated_signer: None,
587                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
588                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
589                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
590                    "prev_msg_blocks_hash",
591                )),
592                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
593                    "prev_event_blocks_hash",
594                )),
595                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
596                events_hash: CryptoHash::new(&TestString::new("events_hash")),
597                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
598                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
599            },
600            body: BlockBody {
601                transactions: vec![],
602                messages: vec![],
603                previous_message_blocks: Default::default(),
604                previous_event_blocks: Default::default(),
605                oracle_responses: vec![],
606                events: vec![],
607                blobs: vec![],
608                operation_results: vec![],
609            },
610        };
611        let confirmed_block = ConfirmedBlock::new(block);
612        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
613
614        // Write certificate
615        let mut batch = MultiPartitionBatch::new();
616        batch.add_certificate(&certificate).unwrap();
617        storage.write_batch(batch).await.unwrap();
618
619        // Verify height index was created (following Event pattern)
620        let hash = certificate.hash();
621        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
622        let store = storage.database.open_shared(&index_root_key).unwrap();
623        let height_key = to_height_key(height);
624        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
625
626        assert!(value_bytes.is_some(), "Height index was not created");
627        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
628        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
629    }
630
631    #[cfg(with_testing)]
632    #[tokio::test]
633    async fn test_read_certificates_by_heights() {
634        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
635        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
636
637        // Write certificates at heights 1, 3, 5
638        let mut batch = MultiPartitionBatch::new();
639        let mut expected_certs = vec![];
640
641        for height in [1, 3, 5] {
642            let block = Block {
643                header: BlockHeader {
644                    chain_id,
645                    epoch: Epoch::ZERO,
646                    height: BlockHeight(height),
647                    timestamp: Timestamp::from(0),
648                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
649                    previous_block_hash: None,
650                    authenticated_signer: None,
651                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
652                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
653                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
654                        "pmb_hash_{height}",
655                    )),
656                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
657                        "peb_hash_{height}",
658                    )),
659                    oracle_responses_hash: CryptoHash::new(&TestString::new(
660                        "oracle_hash_{height}",
661                    )),
662                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
663                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
664                    operation_results_hash: CryptoHash::new(&TestString::new(
665                        "op_results_hash_{height}",
666                    )),
667                },
668                body: BlockBody {
669                    transactions: vec![],
670                    messages: vec![],
671                    previous_message_blocks: Default::default(),
672                    previous_event_blocks: Default::default(),
673                    oracle_responses: vec![],
674                    events: vec![],
675                    blobs: vec![],
676                    operation_results: vec![],
677                },
678            };
679            let confirmed_block = ConfirmedBlock::new(block);
680            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
681            expected_certs.push((height, cert.clone()));
682            batch.add_certificate(&cert).unwrap();
683        }
684        storage.write_batch(batch).await.unwrap();
685
686        // Test: Read in order [1, 3, 5]
687        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
688        let result = storage
689            .read_certificates_by_heights(chain_id, &heights)
690            .await
691            .unwrap();
692        assert_eq!(result.len(), 3);
693        assert_eq!(
694            result[0].as_ref().unwrap().hash(),
695            expected_certs[0].1.hash()
696        );
697        assert_eq!(
698            result[1].as_ref().unwrap().hash(),
699            expected_certs[1].1.hash()
700        );
701        assert_eq!(
702            result[2].as_ref().unwrap().hash(),
703            expected_certs[2].1.hash()
704        );
705
706        // Test: Read out of order [5, 1, 3]
707        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
708        let result = storage
709            .read_certificates_by_heights(chain_id, &heights)
710            .await
711            .unwrap();
712        assert_eq!(result.len(), 3);
713        assert_eq!(
714            result[0].as_ref().unwrap().hash(),
715            expected_certs[2].1.hash()
716        );
717        assert_eq!(
718            result[1].as_ref().unwrap().hash(),
719            expected_certs[0].1.hash()
720        );
721        assert_eq!(
722            result[2].as_ref().unwrap().hash(),
723            expected_certs[1].1.hash()
724        );
725
726        // Test: Read with missing heights [1, 2, 3]
727        let heights = vec![
728            BlockHeight(1),
729            BlockHeight(2),
730            BlockHeight(3),
731            BlockHeight(3),
732        ];
733        let result = storage
734            .read_certificates_by_heights(chain_id, &heights)
735            .await
736            .unwrap();
737        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
738        assert!(result[0].is_some());
739        assert!(result[1].is_none()); // Height 2 doesn't exist
740        assert!(result[2].is_some());
741        assert_eq!(
742            result[2].as_ref().unwrap().hash(),
743            result[3].as_ref().unwrap().hash()
744        ); // Both correspond to height 3
745
746        // Test: Empty heights
747        let heights = vec![];
748        let result = storage
749            .read_certificates_by_heights(chain_id, &heights)
750            .await
751            .unwrap();
752        assert_eq!(result.len(), 0);
753    }
754
755    #[cfg(with_testing)]
756    #[tokio::test]
757    async fn test_read_certificates_by_heights_multiple_chains() {
758        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
759
760        // Create certificates for two different chains at same heights
761        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
762        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
763
764        let mut batch = MultiPartitionBatch::new();
765
766        let block_a = Block {
767            header: BlockHeader {
768                chain_id: chain_a,
769                epoch: Epoch::ZERO,
770                height: BlockHeight(10),
771                timestamp: Timestamp::from(0),
772                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
773                previous_block_hash: None,
774                authenticated_signer: None,
775                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
776                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
777                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
778                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
779                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
780                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
781                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
782                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
783            },
784            body: BlockBody {
785                transactions: vec![],
786                messages: vec![],
787                previous_message_blocks: Default::default(),
788                previous_event_blocks: Default::default(),
789                oracle_responses: vec![],
790                events: vec![],
791                blobs: vec![],
792                operation_results: vec![],
793            },
794        };
795        let confirmed_block_a = ConfirmedBlock::new(block_a);
796        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
797        batch.add_certificate(&cert_a).unwrap();
798
799        let block_b = Block {
800            header: BlockHeader {
801                chain_id: chain_b,
802                epoch: Epoch::ZERO,
803                height: BlockHeight(10),
804                timestamp: Timestamp::from(0),
805                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
806                previous_block_hash: None,
807                authenticated_signer: None,
808                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
809                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
810                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
811                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
812                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
813                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
814                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
815                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
816            },
817            body: BlockBody {
818                transactions: vec![],
819                messages: vec![],
820                previous_message_blocks: Default::default(),
821                previous_event_blocks: Default::default(),
822                oracle_responses: vec![],
823                events: vec![],
824                blobs: vec![],
825                operation_results: vec![],
826            },
827        };
828        let confirmed_block_b = ConfirmedBlock::new(block_b);
829        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
830        batch.add_certificate(&cert_b).unwrap();
831
832        storage.write_batch(batch).await.unwrap();
833
834        // Read from chain A - should get cert A
835        let result = storage
836            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
837            .await
838            .unwrap();
839        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
840
841        // Read from chain B - should get cert B
842        let result = storage
843            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
844            .await
845            .unwrap();
846        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
847
848        // Read from chain A for height that only chain B has - should get None
849        let result = storage
850            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
851            .await
852            .unwrap();
853        assert!(result[0].is_none());
854    }
855
856    #[cfg(with_testing)]
857    #[tokio::test]
858    async fn test_read_certificates_by_heights_consistency() {
859        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
860        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
861
862        // Write certificate
863        let mut batch = MultiPartitionBatch::new();
864        let block = Block {
865            header: BlockHeader {
866                chain_id,
867                epoch: Epoch::ZERO,
868                height: BlockHeight(7),
869                timestamp: Timestamp::from(0),
870                state_hash: CryptoHash::new(&TestString::new("state_hash")),
871                previous_block_hash: None,
872                authenticated_signer: None,
873                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
874                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
875                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
876                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
877                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
878                events_hash: CryptoHash::new(&TestString::new("events_hash")),
879                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
880                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
881            },
882            body: BlockBody {
883                transactions: vec![],
884                messages: vec![],
885                previous_message_blocks: Default::default(),
886                previous_event_blocks: Default::default(),
887                oracle_responses: vec![],
888                events: vec![],
889                blobs: vec![],
890                operation_results: vec![],
891            },
892        };
893        let confirmed_block = ConfirmedBlock::new(block);
894        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
895        let hash = cert.hash();
896        batch.add_certificate(&cert).unwrap();
897        storage.write_batch(batch).await.unwrap();
898
899        // Read by hash
900        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
901
902        // Read by height
903        let certs_by_height = storage
904            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
905            .await
906            .unwrap();
907        let cert_by_height = certs_by_height[0].as_ref().unwrap();
908
909        // Should be identical
910        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
911        assert_eq!(
912            cert_by_hash.value().block().header,
913            cert_by_height.value().block().header
914        );
915    }
916
917    /// Tests that `write_certificate_height_indices` correctly populates the reverse index
918    /// so that `read_certificates_by_heights` can find certificates that were written
919    /// without the height index (simulating old data or fallback scenarios).
920    #[cfg(with_testing)]
921    #[tokio::test]
922    async fn test_write_certificate_height_indices_populates_reverse_index() {
923        use linera_views::{batch::Batch, store::WritableKeyValueStore as _};
924
925        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
926        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
927
928        // Create a certificate but write ONLY the certificate data, NOT the height index.
929        // This simulates old data written before the height index feature existed.
930        let block = Block {
931            header: BlockHeader {
932                chain_id,
933                epoch: Epoch::ZERO,
934                height: BlockHeight(10),
935                timestamp: Timestamp::from(0),
936                state_hash: CryptoHash::new(&TestString::new("state_hash")),
937                previous_block_hash: None,
938                authenticated_signer: None,
939                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
940                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
941                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
942                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
943                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
944                events_hash: CryptoHash::new(&TestString::new("events_hash")),
945                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
946                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
947            },
948            body: BlockBody {
949                transactions: vec![],
950                messages: vec![],
951                previous_message_blocks: Default::default(),
952                previous_event_blocks: Default::default(),
953                oracle_responses: vec![],
954                events: vec![],
955                blobs: vec![],
956                operation_results: vec![],
957            },
958        };
959        let confirmed_block = ConfirmedBlock::new(block);
960        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
961        let hash = cert.hash();
962        let height = BlockHeight(10);
963
964        // Write certificate data directly (bypassing add_certificate which writes the index)
965        let root_key = RootKey::ConfirmedBlock(hash).bytes();
966        let store = storage.database.open_shared(&root_key).unwrap();
967        let mut batch = Batch::new();
968        batch.put_key_value_bytes(
969            crate::db_storage::LITE_CERTIFICATE_KEY.to_vec(),
970            bcs::to_bytes(&cert.lite_certificate()).unwrap(),
971        );
972        batch.put_key_value_bytes(
973            crate::db_storage::BLOCK_KEY.to_vec(),
974            bcs::to_bytes(&cert.value()).unwrap(),
975        );
976        store.write_batch(batch).await.unwrap();
977
978        // Verify height index does NOT exist yet
979        let result = storage
980            .read_certificates_by_heights(chain_id, &[height])
981            .await
982            .unwrap();
983        assert!(
984            result[0].is_none(),
985            "Height index should not exist before write_certificate_height_indices"
986        );
987
988        // Verify certificate can still be read by hash
989        let cert_by_hash = storage.read_certificate(hash).await.unwrap();
990        assert!(cert_by_hash.is_some(), "Certificate should exist by hash");
991
992        // Write the height index (simulating what updater does after fallback)
993        storage
994            .write_certificate_height_indices(chain_id, &[(height, hash)])
995            .await
996            .unwrap();
997
998        // Verify read_certificates_by_heights NOW returns the certificate
999        let result = storage
1000            .read_certificates_by_heights(chain_id, &[height])
1001            .await
1002            .unwrap();
1003        assert!(
1004            result[0].is_some(),
1005            "Height index should exist after write_certificate_height_indices"
1006        );
1007        assert_eq!(
1008            result[0].as_ref().unwrap().hash(),
1009            hash,
1010            "Certificate retrieved by height should match original"
1011        );
1012    }
1013}
1014
1015/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
1016/// chain states into the first store.
1017#[derive(Clone, Copy)]
1018pub struct ChainStatesFirstAssignment;
1019
1020impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
1021    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
1022        if root_key.is_empty() {
1023            return Ok(StoreInUse::Second);
1024        }
1025        let store = match is_chain_state(root_key) {
1026            true => StoreInUse::First,
1027            false => StoreInUse::Second,
1028        };
1029        Ok(store)
1030    }
1031}
1032
1033/// A `Clock` implementation using the system clock.
1034#[derive(Clone)]
1035pub struct WallClock;
1036
1037#[cfg_attr(not(web), async_trait)]
1038#[cfg_attr(web, async_trait(?Send))]
1039impl Clock for WallClock {
1040    fn current_time(&self) -> Timestamp {
1041        Timestamp::now()
1042    }
1043
1044    async fn sleep_until(&self, timestamp: Timestamp) {
1045        let delta = timestamp.delta_since(Timestamp::now());
1046        if delta > TimeDelta::ZERO {
1047            linera_base::time::timer::sleep(delta.as_duration()).await
1048        }
1049    }
1050}
1051
1052#[cfg(with_testing)]
1053#[derive(Default)]
1054struct TestClockInner {
1055    time: Timestamp,
1056    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
1057    /// Optional callback that decides whether to auto-advance for a given target timestamp.
1058    /// Returns `true` if the clock should auto-advance to that time.
1059    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
1060}
1061
1062#[cfg(with_testing)]
1063impl TestClockInner {
1064    fn set(&mut self, time: Timestamp) {
1065        self.time = time;
1066        let senders = self.sleeps.split_off(&Reverse(time));
1067        for sender in senders.into_values().flatten() {
1068            // Receiver may have been dropped if the sleep was cancelled.
1069            sender.send(()).ok();
1070        }
1071    }
1072
1073    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
1074        let (sender, receiver) = oneshot::channel();
1075        let should_auto_advance = self
1076            .sleep_callback
1077            .as_ref()
1078            .is_some_and(|callback| callback(time));
1079        if should_auto_advance && time > self.time {
1080            // Auto-advance mode: immediately advance the clock and complete the sleep.
1081            self.set(time);
1082            // Receiver may have been dropped if the sleep was cancelled.
1083            sender.send(()).ok();
1084        } else if self.time >= time {
1085            // Receiver may have been dropped if the sleep was cancelled.
1086            sender.send(()).ok();
1087        } else {
1088            self.sleeps.entry(Reverse(time)).or_default().push(sender);
1089        }
1090        receiver
1091    }
1092}
1093
1094/// A clock implementation that uses a stored number of microseconds and that can be updated
1095/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
1096#[cfg(with_testing)]
1097#[derive(Clone, Default)]
1098pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
1099
1100#[cfg(with_testing)]
1101#[cfg_attr(not(web), async_trait)]
1102#[cfg_attr(web, async_trait(?Send))]
1103impl Clock for TestClock {
1104    fn current_time(&self) -> Timestamp {
1105        self.lock().time
1106    }
1107
1108    async fn sleep_until(&self, timestamp: Timestamp) {
1109        let receiver = self.lock().add_sleep_until(timestamp);
1110        // Sender may have been dropped if the clock was dropped; just stop waiting.
1111        receiver.await.ok();
1112    }
1113}
1114
1115#[cfg(with_testing)]
1116impl TestClock {
1117    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
1118    pub fn new() -> Self {
1119        TestClock(Arc::default())
1120    }
1121
1122    /// Sets the current time.
1123    pub fn set(&self, time: Timestamp) {
1124        self.lock().set(time);
1125    }
1126
1127    /// Advances the current time by the specified delta.
1128    pub fn add(&self, delta: TimeDelta) {
1129        let mut guard = self.lock();
1130        let time = guard.time.saturating_add(delta);
1131        guard.set(time);
1132    }
1133
1134    /// Returns the current time according to the test clock.
1135    pub fn current_time(&self) -> Timestamp {
1136        self.lock().time
1137    }
1138
1139    /// Sets a callback that decides whether to auto-advance for each sleep call.
1140    ///
1141    /// The callback receives the target timestamp and should return `true` if the clock
1142    /// should auto-advance to that time, or `false` if the sleep should block normally.
1143    pub fn set_sleep_callback<F>(&self, callback: F)
1144    where
1145        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
1146    {
1147        self.lock().sleep_callback = Some(Box::new(callback));
1148    }
1149
1150    /// Clears the sleep callback.
1151    pub fn clear_sleep_callback(&self) {
1152        self.lock().sleep_callback = None;
1153    }
1154
1155    fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
1156        self.0.lock().expect("poisoned TestClock mutex")
1157    }
1158}
1159
1160#[cfg_attr(not(web), async_trait)]
1161#[cfg_attr(web, async_trait(?Send))]
1162impl<Database, C> Storage for DbStorage<Database, C>
1163where
1164    Database: KeyValueDatabase<
1165            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
1166            Error: Send + Sync,
1167        > + Clone
1168        + linera_base::util::traits::AutoTraits
1169        + 'static,
1170    C: Clock + Clone + Send + Sync + 'static,
1171{
1172    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
1173    type Clock = C;
1174    type BlockExporterContext = ViewContext<u32, Database::Store>;
1175
1176    fn clock(&self) -> &C {
1177        &self.clock
1178    }
1179
1180    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
1181        &self.thread_pool
1182    }
1183
1184    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
1185    async fn load_chain(
1186        &self,
1187        chain_id: ChainId,
1188    ) -> Result<ChainStateView<Self::Context>, ViewError> {
1189        #[cfg(with_metrics)]
1190        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
1191        let runtime_context = ChainRuntimeContext {
1192            storage: self.clone(),
1193            thread_pool: self.thread_pool.clone(),
1194            chain_id,
1195            execution_runtime_config: self.execution_runtime_config,
1196            user_contracts: self.user_contracts.clone(),
1197            user_services: self.user_services.clone(),
1198        };
1199        let root_key = RootKey::ChainState(chain_id).bytes();
1200        let store = self.database.open_exclusive(&root_key)?;
1201        let context = ViewContext::create_root_context(store, runtime_context).await?;
1202        ChainStateView::load(context).await
1203    }
1204
1205    #[instrument(level = "trace", skip_all, fields(%blob_id))]
1206    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1207        if self.caches.blob.contains(&blob_id) {
1208            #[cfg(with_metrics)]
1209            metrics::CONTAINS_BLOB_COUNTER
1210                .with_label_values(&[metrics::CACHE])
1211                .inc();
1212            return Ok(true);
1213        }
1214        let root_key = RootKey::Blob(blob_id).bytes();
1215        let store = self.database.open_shared(&root_key)?;
1216        let test = store.contains_key(BLOB_KEY).await?;
1217        #[cfg(with_metrics)]
1218        metrics::CONTAINS_BLOB_COUNTER
1219            .with_label_values(&[metrics::DB])
1220            .inc();
1221        Ok(test)
1222    }
1223
1224    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
1225    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
1226        let mut missing_blobs = Vec::new();
1227        #[cfg(with_metrics)]
1228        let mut cache_hits: u64 = 0;
1229        #[cfg(with_metrics)]
1230        let mut db_checks: u64 = 0;
1231        for blob_id in blob_ids {
1232            if self.caches.blob.contains(blob_id) {
1233                #[cfg(with_metrics)]
1234                {
1235                    cache_hits += 1;
1236                }
1237                continue;
1238            }
1239            #[cfg(with_metrics)]
1240            {
1241                db_checks += 1;
1242            }
1243            let root_key = RootKey::Blob(*blob_id).bytes();
1244            let store = self.database.open_shared(&root_key)?;
1245            if !store.contains_key(BLOB_KEY).await? {
1246                missing_blobs.push(*blob_id);
1247            }
1248        }
1249        #[cfg(with_metrics)]
1250        {
1251            if cache_hits > 0 {
1252                metrics::CONTAINS_BLOBS_COUNTER
1253                    .with_label_values(&[metrics::CACHE])
1254                    .inc_by(cache_hits);
1255            }
1256            if db_checks > 0 {
1257                metrics::CONTAINS_BLOBS_COUNTER
1258                    .with_label_values(&[metrics::DB])
1259                    .inc_by(db_checks);
1260            }
1261        }
1262        Ok(missing_blobs)
1263    }
1264
1265    #[instrument(skip_all, fields(%blob_id))]
1266    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1267        let root_key = RootKey::Blob(blob_id).bytes();
1268        let store = self.database.open_shared(&root_key)?;
1269        let test = store.contains_key(BLOB_STATE_KEY).await?;
1270        #[cfg(with_metrics)]
1271        metrics::CONTAINS_BLOB_STATE_COUNTER
1272            .with_label_values(&[metrics::DB])
1273            .inc();
1274        Ok(test)
1275    }
1276
1277    #[instrument(skip_all, fields(%hash))]
1278    async fn read_confirmed_block(
1279        &self,
1280        hash: CryptoHash,
1281    ) -> Result<Option<Arc<ConfirmedBlock>>, ViewError> {
1282        if let Some(block) = self.caches.confirmed_block.get(&hash) {
1283            #[cfg(with_metrics)]
1284            metrics::READ_CONFIRMED_BLOCK_COUNTER
1285                .with_label_values(&[metrics::CACHE])
1286                .inc();
1287            return Ok(Some(block));
1288        }
1289        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1290        let store = self.database.open_shared(&root_key)?;
1291        let value = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await?;
1292        #[cfg(with_metrics)]
1293        metrics::READ_CONFIRMED_BLOCK_COUNTER
1294            .with_label_values(&[metrics::DB])
1295            .inc();
1296        match value {
1297            Some(block) => Ok(Some(self.caches.confirmed_block.insert(&hash, block))),
1298            None => Ok(None),
1299        }
1300    }
1301
1302    #[instrument(skip_all)]
1303    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
1304        &self,
1305        hashes: I,
1306    ) -> Result<Vec<Option<Arc<ConfirmedBlock>>>, ViewError> {
1307        let hashes = hashes.into_iter().collect::<Vec<_>>();
1308        if hashes.is_empty() {
1309            return Ok(Vec::new());
1310        }
1311        let mut results = vec![None; hashes.len()];
1312        let mut misses = Vec::new();
1313        for (i, hash) in hashes.iter().enumerate() {
1314            if let Some(block) = self.caches.confirmed_block.get(hash) {
1315                results[i] = Some(block);
1316            } else {
1317                misses.push(i);
1318            }
1319        }
1320        if !misses.is_empty() {
1321            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1322            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1323            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1324                let store = self.database.open_shared(&root_key)?;
1325                if let Some(block) = store.read_value::<ConfirmedBlock>(BLOCK_KEY).await? {
1326                    results[*miss_idx] = Some(
1327                        self.caches
1328                            .confirmed_block
1329                            .insert(&hashes[*miss_idx], block),
1330                    );
1331                }
1332            }
1333        }
1334        #[cfg(with_metrics)]
1335        {
1336            let cache_hits = (hashes.len() - misses.len()) as u64;
1337            if cache_hits > 0 {
1338                metrics::READ_CONFIRMED_BLOCKS_COUNTER
1339                    .with_label_values(&[metrics::CACHE])
1340                    .inc_by(cache_hits);
1341            }
1342            let db_reads = misses.len() as u64;
1343            if db_reads > 0 {
1344                metrics::READ_CONFIRMED_BLOCKS_COUNTER
1345                    .with_label_values(&[metrics::DB])
1346                    .inc_by(db_reads);
1347            }
1348        }
1349        Ok(results)
1350    }
1351
1352    #[instrument(skip_all, fields(%blob_id))]
1353    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Arc<Blob>>, ViewError> {
1354        if let Some(blob) = self.caches.blob.get(&blob_id) {
1355            #[cfg(with_metrics)]
1356            metrics::READ_BLOB_COUNTER
1357                .with_label_values(&[metrics::CACHE])
1358                .inc();
1359            return Ok(Some(blob));
1360        }
1361        let root_key = RootKey::Blob(blob_id).bytes();
1362        let store = self.database.open_shared(&root_key)?;
1363        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
1364        #[cfg(with_metrics)]
1365        metrics::READ_BLOB_COUNTER
1366            .with_label_values(&[metrics::DB])
1367            .inc();
1368        match maybe_blob_bytes {
1369            Some(blob_bytes) => {
1370                let blob = Blob::new_with_id_unchecked(blob_id, blob_bytes);
1371                Ok(Some(self.caches.blob.insert(&blob_id, blob)))
1372            }
1373            None => Ok(None),
1374        }
1375    }
1376
1377    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1378    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Arc<Blob>>>, ViewError> {
1379        if blob_ids.is_empty() {
1380            return Ok(Vec::new());
1381        }
1382        let mut blobs = Vec::new();
1383        for blob_id in blob_ids {
1384            blobs.push(self.read_blob(*blob_id).await?);
1385        }
1386        Ok(blobs)
1387    }
1388
1389    #[instrument(skip_all, fields(%blob_id))]
1390    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
1391        let root_key = RootKey::Blob(blob_id).bytes();
1392        let store = self.database.open_shared(&root_key)?;
1393        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1394        #[cfg(with_metrics)]
1395        metrics::READ_BLOB_STATE_COUNTER
1396            .with_label_values(&[metrics::DB])
1397            .inc();
1398        Ok(blob_state)
1399    }
1400
1401    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1402    async fn read_blob_states(
1403        &self,
1404        blob_ids: &[BlobId],
1405    ) -> Result<Vec<Option<BlobState>>, ViewError> {
1406        if blob_ids.is_empty() {
1407            return Ok(Vec::new());
1408        }
1409        let mut blob_states = Vec::new();
1410        for blob_id in blob_ids {
1411            blob_states.push(self.read_blob_state(*blob_id).await?);
1412        }
1413        Ok(blob_states)
1414    }
1415
1416    #[instrument(skip_all, fields(blob_id = %blob.id()))]
1417    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1418        let mut batch = MultiPartitionBatch::new();
1419        batch.add_blob(blob);
1420        self.write_batch(batch).await?;
1421        Ok(())
1422    }
1423
1424    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1425    async fn maybe_write_blob_states(
1426        &self,
1427        blob_ids: &[BlobId],
1428        blob_state: BlobState,
1429    ) -> Result<(), ViewError> {
1430        if blob_ids.is_empty() {
1431            return Ok(());
1432        }
1433        let mut maybe_blob_states = Vec::new();
1434        for blob_id in blob_ids {
1435            let root_key = RootKey::Blob(*blob_id).bytes();
1436            let store = self.database.open_shared(&root_key)?;
1437            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1438            maybe_blob_states.push(maybe_blob_state);
1439        }
1440        let mut batch = MultiPartitionBatch::new();
1441        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1442            match maybe_blob_state {
1443                None => {
1444                    batch.add_blob_state(*blob_id, &blob_state)?;
1445                }
1446                Some(state) => {
1447                    if state.epoch < blob_state.epoch {
1448                        batch.add_blob_state(*blob_id, &blob_state)?;
1449                    }
1450                }
1451            }
1452        }
1453        // We tolerate race conditions because two active chains are likely to
1454        // be both from the latest epoch, and otherwise failing to pick the
1455        // more recent blob state has limited impact.
1456        self.write_batch(batch).await?;
1457        Ok(())
1458    }
1459
1460    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1461    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1462        if blobs.is_empty() {
1463            return Ok(Vec::new());
1464        }
1465        let mut batch = MultiPartitionBatch::new();
1466        let mut blob_states = Vec::new();
1467        for blob in blobs {
1468            let root_key = RootKey::Blob(blob.id()).bytes();
1469            let store = self.database.open_shared(&root_key)?;
1470            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1471            blob_states.push(has_state);
1472            if has_state {
1473                batch.add_blob(blob);
1474            }
1475        }
1476        self.write_batch(batch).await?;
1477        Ok(blob_states)
1478    }
1479
1480    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1481    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1482        if blobs.is_empty() {
1483            return Ok(());
1484        }
1485        let mut batch = MultiPartitionBatch::new();
1486        for blob in blobs {
1487            batch.add_blob(blob);
1488        }
1489        self.write_batch(batch).await
1490    }
1491
1492    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1493    async fn write_blobs_and_certificate(
1494        &self,
1495        blobs: &[Blob],
1496        certificate: &ConfirmedBlockCertificate,
1497    ) -> Result<(), ViewError> {
1498        let mut batch = MultiPartitionBatch::new();
1499        for blob in blobs {
1500            batch.add_blob(blob);
1501        }
1502        batch.add_certificate(certificate)?;
1503        self.write_batch(batch).await
1504    }
1505
1506    #[instrument(skip_all, fields(%hash))]
1507    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1508        if self.caches.certificate.contains(&hash) || self.caches.certificate_raw.contains(&hash) {
1509            #[cfg(with_metrics)]
1510            metrics::CONTAINS_CERTIFICATE_COUNTER
1511                .with_label_values(&[metrics::CACHE])
1512                .inc();
1513            return Ok(true);
1514        }
1515        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1516        let store = self.database.open_shared(&root_key)?;
1517        let results = store.contains_keys(&get_block_keys()).await?;
1518        #[cfg(with_metrics)]
1519        metrics::CONTAINS_CERTIFICATE_COUNTER
1520            .with_label_values(&[metrics::DB])
1521            .inc();
1522        Ok(results[0] && results[1])
1523    }
1524
1525    #[instrument(skip_all, fields(%hash))]
1526    async fn read_certificate(
1527        &self,
1528        hash: CryptoHash,
1529    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1530        // Assembled certificate cache (single Arc, no re-assembly)
1531        if let Some(cert) = self.caches.certificate.get(&hash) {
1532            #[cfg(with_metrics)]
1533            metrics::READ_CERTIFICATE_COUNTER
1534                .with_label_values(&[metrics::CACHE])
1535                .inc();
1536            return Ok(Some(cert));
1537        }
1538        // Raw bytes cache — deserialize + populate caches
1539        if let Some(raw) = self.caches.certificate_raw.get(&hash) {
1540            #[cfg(with_metrics)]
1541            metrics::READ_CERTIFICATE_COUNTER
1542                .with_label_values(&[metrics::CACHE])
1543                .inc();
1544            return self.deserialize_and_cache_certificate(&raw.0, &raw.1);
1545        }
1546        // DB
1547        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1548        let store = self.database.open_shared(&root_key)?;
1549        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1550        #[cfg(with_metrics)]
1551        metrics::READ_CERTIFICATE_COUNTER
1552            .with_label_values(&[metrics::DB])
1553            .inc();
1554        let Some(lite_cert_bytes) = values[0].as_ref() else {
1555            return Ok(None);
1556        };
1557        let Some(confirmed_block_bytes) = values[1].as_ref() else {
1558            return Ok(None);
1559        };
1560        self.caches.certificate_raw.insert(
1561            &hash,
1562            (lite_cert_bytes.clone(), confirmed_block_bytes.clone()),
1563        );
1564        self.deserialize_and_cache_certificate(lite_cert_bytes, confirmed_block_bytes)
1565    }
1566
1567    #[instrument(skip_all)]
1568    async fn read_certificates(
1569        &self,
1570        hashes: &[CryptoHash],
1571    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1572        let raw_certs = self.read_certificates_raw(hashes).await?;
1573
1574        raw_certs
1575            .into_iter()
1576            .map(|maybe_raw| {
1577                let Some(raw) = maybe_raw else {
1578                    return Ok(None);
1579                };
1580                self.deserialize_and_cache_certificate(&raw.0, &raw.1)
1581            })
1582            .collect()
1583    }
1584
1585    #[instrument(skip_all)]
1586    async fn read_certificates_raw(
1587        &self,
1588        hashes: &[CryptoHash],
1589    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1590        if hashes.is_empty() {
1591            return Ok(Vec::new());
1592        }
1593        let mut results = vec![None; hashes.len()];
1594        let mut misses = Vec::new();
1595        for (i, hash) in hashes.iter().enumerate() {
1596            if let Some(raw) = self.caches.certificate_raw.get(hash) {
1597                results[i] = Some(raw);
1598            } else {
1599                misses.push(i);
1600            }
1601        }
1602        if !misses.is_empty() {
1603            let miss_hashes: Vec<_> = misses.iter().map(|&i| hashes[i]).collect();
1604            let root_keys = Self::get_root_keys_for_certificates(&miss_hashes);
1605            for (miss_idx, root_key) in misses.iter().zip(root_keys) {
1606                let store = self.database.open_shared(&root_key)?;
1607                let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1608                if let (Some(lite), Some(block)) = (values[0].as_ref(), values[1].as_ref()) {
1609                    results[*miss_idx] = Some(
1610                        self.caches
1611                            .certificate_raw
1612                            .insert(&hashes[*miss_idx], (lite.clone(), block.clone())),
1613                    );
1614                }
1615            }
1616        }
1617        #[cfg(with_metrics)]
1618        {
1619            let cache_hits = (hashes.len() - misses.len()) as u64;
1620            if cache_hits > 0 {
1621                metrics::READ_CERTIFICATES_COUNTER
1622                    .with_label_values(&[metrics::CACHE])
1623                    .inc_by(cache_hits);
1624            }
1625            let db_reads = misses.len() as u64;
1626            if db_reads > 0 {
1627                metrics::READ_CERTIFICATES_COUNTER
1628                    .with_label_values(&[metrics::DB])
1629                    .inc_by(db_reads);
1630            }
1631        }
1632        Ok(results)
1633    }
1634
1635    async fn read_certificate_hashes_by_heights(
1636        &self,
1637        chain_id: ChainId,
1638        heights: &[BlockHeight],
1639    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1640        if heights.is_empty() {
1641            return Ok(Vec::new());
1642        }
1643
1644        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1645        let store = self.database.open_shared(&index_root_key)?;
1646        let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1647        let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1648        let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1649            .into_iter()
1650            .map(|opt| {
1651                opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1652                    .transpose()
1653            })
1654            .collect::<Result<_, _>>()?;
1655
1656        Ok(hash_options)
1657    }
1658
1659    #[instrument(skip_all)]
1660    async fn read_certificates_by_heights_raw(
1661        &self,
1662        chain_id: ChainId,
1663        heights: &[BlockHeight],
1664    ) -> Result<Vec<Option<Arc<(Vec<u8>, Vec<u8>)>>>, ViewError> {
1665        let hashes: Vec<Option<CryptoHash>> = self
1666            .read_certificate_hashes_by_heights(chain_id, heights)
1667            .await?;
1668
1669        // Map from hash to all indices in the heights array (handles duplicates)
1670        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1671        for (index, maybe_hash) in hashes.iter().enumerate() {
1672            if let Some(hash) = maybe_hash {
1673                indices.entry(*hash).or_default().push(index);
1674            }
1675        }
1676
1677        // Deduplicate hashes for the storage query
1678        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1679
1680        let mut result = vec![None; heights.len()];
1681
1682        for (raw_cert, hash) in self
1683            .read_certificates_raw(&unique_hashes)
1684            .await?
1685            .into_iter()
1686            .zip(unique_hashes)
1687        {
1688            if let Some(idx_list) = indices.get(&hash) {
1689                for &index in idx_list {
1690                    result[index] = raw_cert.clone();
1691                }
1692            } else {
1693                // This should not happen, but log a warning if it does.
1694                tracing::warn!(
1695                    hash=?hash,
1696                    "certificate hash not found in indices map",
1697                );
1698            }
1699        }
1700
1701        Ok(result)
1702    }
1703
1704    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1705    async fn read_certificates_by_heights(
1706        &self,
1707        chain_id: ChainId,
1708        heights: &[BlockHeight],
1709    ) -> Result<Vec<Option<Arc<ConfirmedBlockCertificate>>>, ViewError> {
1710        self.read_certificates_by_heights_raw(chain_id, heights)
1711            .await?
1712            .into_iter()
1713            .map(|maybe_raw| match maybe_raw {
1714                None => Ok(None),
1715                Some(raw) => self.deserialize_and_cache_certificate(&raw.0, &raw.1),
1716            })
1717            .collect()
1718    }
1719
1720    #[instrument(skip_all, fields(%chain_id, indices_len = indices.len()))]
1721    async fn write_certificate_height_indices(
1722        &self,
1723        chain_id: ChainId,
1724        indices: &[(BlockHeight, CryptoHash)],
1725    ) -> Result<(), ViewError> {
1726        if indices.is_empty() {
1727            return Ok(());
1728        }
1729
1730        let mut batch = MultiPartitionBatch::new();
1731        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1732        let key_values: Vec<(Vec<u8>, Vec<u8>)> = indices
1733            .iter()
1734            .map(|(height, hash)| {
1735                let height_key = to_height_key(*height);
1736                let hash_value = bcs::to_bytes(hash).unwrap();
1737                (height_key, hash_value)
1738            })
1739            .collect();
1740        batch.put_key_values(index_root_key, key_values);
1741        self.write_batch(batch).await
1742    }
1743
1744    #[instrument(skip_all, fields(event_id = ?event_id))]
1745    async fn read_event(&self, event_id: EventId) -> Result<Option<Arc<Vec<u8>>>, ViewError> {
1746        if let Some(event) = self.caches.event.get(&event_id) {
1747            #[cfg(with_metrics)]
1748            metrics::READ_EVENT_COUNTER
1749                .with_label_values(&[metrics::CACHE])
1750                .inc();
1751            return Ok(Some(event));
1752        }
1753        let event_key = to_event_key(&event_id);
1754        let root_key = RootKey::Event(event_id.chain_id).bytes();
1755        let store = self.database.open_shared(&root_key)?;
1756        let event = store.read_value_bytes(&event_key).await?;
1757        #[cfg(with_metrics)]
1758        metrics::READ_EVENT_COUNTER
1759            .with_label_values(&[metrics::DB])
1760            .inc();
1761        match event {
1762            Some(event_bytes) => Ok(Some(self.caches.event.insert(&event_id, event_bytes))),
1763            None => Ok(None),
1764        }
1765    }
1766
1767    #[instrument(skip_all, fields(event_id = ?event_id))]
1768    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1769        if self.caches.event.contains(&event_id) {
1770            #[cfg(with_metrics)]
1771            metrics::CONTAINS_EVENT_COUNTER
1772                .with_label_values(&[metrics::CACHE])
1773                .inc();
1774            return Ok(true);
1775        }
1776        let event_key = to_event_key(&event_id);
1777        let root_key = RootKey::Event(event_id.chain_id).bytes();
1778        let store = self.database.open_shared(&root_key)?;
1779        let exists = store.contains_key(&event_key).await?;
1780        #[cfg(with_metrics)]
1781        metrics::CONTAINS_EVENT_COUNTER
1782            .with_label_values(&[metrics::DB])
1783            .inc();
1784        Ok(exists)
1785    }
1786
1787    #[instrument(skip_all, fields(%chain_id, %stream_id, %start_index))]
1788    async fn read_events_from_index(
1789        &self,
1790        chain_id: &ChainId,
1791        stream_id: &StreamId,
1792        start_index: u32,
1793    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1794        let root_key = RootKey::Event(*chain_id).bytes();
1795        let store = self.database.open_shared(&root_key)?;
1796        let mut keys = Vec::new();
1797        let mut indices = Vec::new();
1798        let prefix = bcs::to_bytes(stream_id).unwrap();
1799        for short_key in store.find_keys_by_prefix(&prefix).await? {
1800            let index = bcs::from_bytes::<u32>(&short_key)?;
1801            if index >= start_index {
1802                let mut key = prefix.clone();
1803                key.extend(short_key);
1804                keys.push(key);
1805                indices.push(index);
1806            }
1807        }
1808        let values = store.read_multi_values_bytes(&keys).await?;
1809        let mut returned_values = Vec::new();
1810        for (index, value) in indices.into_iter().zip(values) {
1811            let event = value.unwrap();
1812            returned_values.push(IndexAndEvent { index, event });
1813        }
1814        Ok(returned_values)
1815    }
1816
1817    #[instrument(skip_all)]
1818    async fn write_events(
1819        &self,
1820        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1821    ) -> Result<(), ViewError> {
1822        let mut batch = MultiPartitionBatch::new();
1823        for (event_id, value) in events {
1824            batch.add_event(&event_id, value);
1825        }
1826        self.write_batch(batch).await
1827    }
1828
1829    #[instrument(skip_all)]
1830    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1831        let root_key = RootKey::NetworkDescription.bytes();
1832        let store = self.database.open_shared(&root_key)?;
1833        let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1834        #[cfg(with_metrics)]
1835        metrics::READ_NETWORK_DESCRIPTION
1836            .with_label_values(&[metrics::DB])
1837            .inc();
1838        Ok(maybe_value)
1839    }
1840
1841    #[instrument(skip_all)]
1842    async fn write_network_description(
1843        &self,
1844        information: &NetworkDescription,
1845    ) -> Result<(), ViewError> {
1846        let mut batch = MultiPartitionBatch::new();
1847        batch.add_network_description(information)?;
1848        self.write_batch(batch).await?;
1849        Ok(())
1850    }
1851
1852    fn shared_committees(&self) -> &SharedCommittees {
1853        &self.shared_committees
1854    }
1855
1856    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1857        self.wasm_runtime
1858    }
1859
1860    #[instrument(skip_all)]
1861    async fn block_exporter_context(
1862        &self,
1863        block_exporter_id: u32,
1864    ) -> Result<Self::BlockExporterContext, ViewError> {
1865        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1866        let store = self.database.open_exclusive(&root_key)?;
1867        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1868    }
1869}
1870
1871impl<Database, C> DbStorage<Database, C>
1872where
1873    Database: KeyValueDatabase + Clone,
1874    Database::Store: KeyValueStore + Clone,
1875    C: Clock,
1876    Database::Error: Send + Sync,
1877{
1878    #[instrument(skip_all)]
1879    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1880        hashes
1881            .iter()
1882            .map(|hash| RootKey::ConfirmedBlock(*hash).bytes())
1883            .collect()
1884    }
1885
1886    fn deserialize_and_cache_certificate(
1887        &self,
1888        lite_cert_bytes: &[u8],
1889        confirmed_block_bytes: &[u8],
1890    ) -> Result<Option<Arc<ConfirmedBlockCertificate>>, ViewError> {
1891        let lite = bcs::from_bytes::<LiteCertificate>(lite_cert_bytes)?;
1892        let block = bcs::from_bytes::<ConfirmedBlock>(confirmed_block_bytes)?;
1893        let hash = block.hash();
1894        self.caches.confirmed_block.insert(&hash, block.clone());
1895        let certificate = lite
1896            .with_value(block)
1897            .ok_or(ViewError::InconsistentEntries)?;
1898        let arc = self.caches.certificate.insert(&hash, certificate);
1899        Ok(Some(arc))
1900    }
1901
1902    #[instrument(skip_all)]
1903    async fn write_entry(
1904        store: &Database::Store,
1905        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1906    ) -> Result<(), ViewError> {
1907        let mut batch = Batch::new();
1908        for (key, value) in key_values {
1909            batch.put_key_value_bytes(key, value);
1910        }
1911        store.write_batch(batch).await?;
1912        Ok(())
1913    }
1914
1915    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1916    pub(crate) async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1917        if batch.keys_value_bytes.is_empty() {
1918            return Ok(());
1919        }
1920        let mut futures = Vec::new();
1921        for (root_key, key_values) in batch.keys_value_bytes {
1922            let store = self.database.open_shared(&root_key)?;
1923            futures.push(async move { Self::write_entry(&store, key_values).await });
1924        }
1925        futures::future::try_join_all(futures).await?;
1926        Ok(())
1927    }
1928}
1929
1930impl<Database, C> DbStorage<Database, C>
1931where
1932    Database: KeyValueDatabase + Clone + 'static,
1933    Database::Error: Send + Sync,
1934    Database::Store: KeyValueStore + Clone + 'static,
1935    C: Clock + Clone + Send + Sync + 'static,
1936{
1937    pub(crate) fn new(
1938        database: Database,
1939        wasm_runtime: Option<WasmRuntime>,
1940        cache_sizes: StorageCacheConfig,
1941        clock: C,
1942    ) -> Self {
1943        Self {
1944            database: Arc::new(database),
1945            clock,
1946            // The `Arc` here is required on native but useless on the Web.
1947            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1948            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1949            wasm_runtime,
1950            user_contracts: Arc::new(papaya::HashMap::new()),
1951            user_services: Arc::new(papaya::HashMap::new()),
1952            shared_committees: SharedCommittees::new(),
1953            caches: StorageCaches::new(cache_sizes),
1954            execution_runtime_config: ExecutionRuntimeConfig::default(),
1955        }
1956    }
1957
1958    /// Sets whether contract log messages should be output.
1959    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1960        self.execution_runtime_config.allow_application_logs = allow;
1961        self
1962    }
1963}
1964
1965impl<Database> DbStorage<Database, WallClock>
1966where
1967    Database: KeyValueDatabase + Clone + 'static,
1968    Database::Error: Send + Sync,
1969    Database::Store: KeyValueStore + Clone + 'static,
1970{
1971    pub async fn maybe_create_and_connect(
1972        config: &Database::Config,
1973        namespace: &str,
1974        wasm_runtime: Option<WasmRuntime>,
1975        cache_sizes: StorageCacheConfig,
1976    ) -> Result<Self, ViewError> {
1977        let database = Database::maybe_create_and_connect(config, namespace).await?;
1978        let storage = Self::new(database, wasm_runtime, cache_sizes, WallClock);
1979        Ok(storage)
1980    }
1981
1982    pub async fn connect(
1983        config: &Database::Config,
1984        namespace: &str,
1985        wasm_runtime: Option<WasmRuntime>,
1986        cache_sizes: StorageCacheConfig,
1987    ) -> Result<Self, ViewError> {
1988        let database = Database::connect(config, namespace).await?;
1989        let storage = Self::new(database, wasm_runtime, cache_sizes, WallClock);
1990        Ok(storage)
1991    }
1992
1993    /// Lists the blob IDs of the storage.
1994    pub async fn list_blob_ids(
1995        config: &Database::Config,
1996        namespace: &str,
1997    ) -> Result<Vec<BlobId>, ViewError> {
1998        let database = Database::connect(config, namespace).await?;
1999        let root_keys = database.list_root_keys().await?;
2000        let mut blob_ids = Vec::new();
2001        for root_key in root_keys {
2002            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
2003                let root_key_red = &root_key[1..];
2004                let blob_id = bcs::from_bytes(root_key_red)?;
2005                blob_ids.push(blob_id);
2006            }
2007        }
2008        Ok(blob_ids)
2009    }
2010}
2011
2012impl<Database> DbStorage<Database, WallClock>
2013where
2014    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
2015    Database::Error: Send + Sync,
2016{
2017    /// Lists the chain IDs of the storage.
2018    pub async fn list_chain_ids(
2019        config: &Database::Config,
2020        namespace: &str,
2021    ) -> Result<Vec<ChainId>, ViewError> {
2022        let database = Database::connect(config, namespace).await?;
2023        let root_keys = database.list_root_keys().await?;
2024        let mut chain_ids = Vec::new();
2025        for root_key in root_keys {
2026            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
2027                let root_key_red = &root_key[1..];
2028                let chain_id = bcs::from_bytes(root_key_red)?;
2029                chain_ids.push(chain_id);
2030            }
2031        }
2032        Ok(chain_ids)
2033    }
2034}
2035
2036#[cfg(with_testing)]
2037impl<Database> DbStorage<Database, TestClock>
2038where
2039    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
2040    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
2041    Database::Error: Send + Sync,
2042{
2043    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
2044        let config = Database::new_test_config().await.unwrap();
2045        let namespace = generate_test_namespace();
2046        DbStorage::<Database, TestClock>::new_for_testing(
2047            config,
2048            &namespace,
2049            wasm_runtime,
2050            TestClock::new(),
2051        )
2052        .await
2053        .unwrap()
2054    }
2055
2056    pub async fn new_for_testing(
2057        config: Database::Config,
2058        namespace: &str,
2059        wasm_runtime: Option<WasmRuntime>,
2060        clock: TestClock,
2061    ) -> Result<Self, ViewError> {
2062        let database = Database::recreate_and_connect(&config, namespace).await?;
2063        let storage = Self::new(database, wasm_runtime, DEFAULT_STORAGE_CACHE_CONFIG, clock);
2064        storage.assert_is_migrated_storage().await?;
2065        Ok(storage)
2066    }
2067}