Skip to main content

linera_storage/
db_storage.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::{collections::HashMap, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10    crypto::CryptoHash,
11    data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
12    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15    types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16    ChainStateView,
17};
18use linera_execution::{
19    BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22    backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23    batch::Batch,
24    context::ViewContext,
25    store::{
26        KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
27    },
28    views::View,
29    ViewError,
30};
31use serde::{Deserialize, Serialize};
32use tracing::instrument;
33#[cfg(with_testing)]
34use {
35    futures::channel::oneshot::{self, Receiver},
36    linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
37    std::{cmp::Reverse, collections::BTreeMap},
38};
39
40use crate::{ChainRuntimeContext, Clock, Storage};
41
42#[cfg(with_metrics)]
43pub mod metrics {
44    use std::sync::LazyLock;
45
46    use linera_base::prometheus_util::{
47        exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
48    };
49    use prometheus::{HistogramVec, IntCounterVec};
50
51    /// The metric counting how often a blob is tested for existence from storage
52    pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
53        register_int_counter_vec(
54            "contains_blob",
55            "The metric counting how often a blob is tested for existence from storage",
56            &[],
57        )
58    });
59
60    /// The metric counting how often multiple blobs are tested for existence from storage
61    pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
62        register_int_counter_vec(
63            "contains_blobs",
64            "The metric counting how often multiple blobs are tested for existence from storage",
65            &[],
66        )
67    });
68
69    /// The metric counting how often a blob state is tested for existence from storage
70    pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
71        register_int_counter_vec(
72            "contains_blob_state",
73            "The metric counting how often a blob state is tested for existence from storage",
74            &[],
75        )
76    });
77
78    /// The metric counting how often a certificate is tested for existence from storage.
79    pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
80        register_int_counter_vec(
81            "contains_certificate",
82            "The metric counting how often a certificate is tested for existence from storage",
83            &[],
84        )
85    });
86
87    /// The metric counting how often a hashed certificate value is read from storage.
88    #[doc(hidden)]
89    pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90        register_int_counter_vec(
91            "read_confirmed_block",
92            "The metric counting how often a hashed confirmed block is read from storage",
93            &[],
94        )
95    });
96
97    /// The metric counting how often confirmed blocks are read from storage.
98    #[doc(hidden)]
99    pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
100        LazyLock::new(|| {
101            register_int_counter_vec(
102                "read_confirmed_blocks",
103                "The metric counting how often confirmed blocks are read from storage",
104                &[],
105            )
106        });
107
108    /// The metric counting how often a blob is read from storage.
109    #[doc(hidden)]
110    pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
111        register_int_counter_vec(
112            "read_blob",
113            "The metric counting how often a blob is read from storage",
114            &[],
115        )
116    });
117
118    /// The metric counting how often a blob state is read from storage.
119    #[doc(hidden)]
120    pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
121        register_int_counter_vec(
122            "read_blob_state",
123            "The metric counting how often a blob state is read from storage",
124            &[],
125        )
126    });
127
128    /// The metric counting how often blob states are read from storage.
129    #[doc(hidden)]
130    pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
131        register_int_counter_vec(
132            "read_blob_states",
133            "The metric counting how often blob states are read from storage",
134            &[],
135        )
136    });
137
138    /// The metric counting how often a blob is written to storage.
139    #[doc(hidden)]
140    pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
141        register_int_counter_vec(
142            "write_blob",
143            "The metric counting how often a blob is written to storage",
144            &[],
145        )
146    });
147
148    /// The metric counting how often a certificate is read from storage.
149    #[doc(hidden)]
150    pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
151        register_int_counter_vec(
152            "read_certificate",
153            "The metric counting how often a certificate is read from storage",
154            &[],
155        )
156    });
157
158    /// The metric counting how often certificates are read from storage.
159    #[doc(hidden)]
160    pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
161        register_int_counter_vec(
162            "read_certificates",
163            "The metric counting how often certificate are read from storage",
164            &[],
165        )
166    });
167
168    /// The metric counting how often a certificate is written to storage.
169    #[doc(hidden)]
170    pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
171        register_int_counter_vec(
172            "write_certificate",
173            "The metric counting how often a certificate is written to storage",
174            &[],
175        )
176    });
177
178    /// The latency to load a chain state.
179    #[doc(hidden)]
180    pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
181        register_histogram_vec(
182            "load_chain_latency",
183            "The latency to load a chain state",
184            &[],
185            exponential_bucket_latencies(1000.0),
186        )
187    });
188
189    /// The metric counting how often an event is read from storage.
190    #[doc(hidden)]
191    pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
192        register_int_counter_vec(
193            "read_event",
194            "The metric counting how often an event is read from storage",
195            &[],
196        )
197    });
198
199    /// The metric counting how often an event is tested for existence from storage
200    pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
201        register_int_counter_vec(
202            "contains_event",
203            "The metric counting how often an event is tested for existence from storage",
204            &[],
205        )
206    });
207
208    /// The metric counting how often an event is written to storage.
209    #[doc(hidden)]
210    pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
211        register_int_counter_vec(
212            "write_event",
213            "The metric counting how often an event is written to storage",
214            &[],
215        )
216    });
217
218    /// The metric counting how often the network description is read from storage.
219    #[doc(hidden)]
220    pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
221        register_int_counter_vec(
222            "network_description",
223            "The metric counting how often the network description is read from storage",
224            &[],
225        )
226    });
227
228    /// The metric counting how often the network description is written to storage.
229    #[doc(hidden)]
230    pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
231        register_int_counter_vec(
232            "write_network_description",
233            "The metric counting how often the network description is written to storage",
234            &[],
235        )
236    });
237}
238
239/// The key used for blobs. The Blob ID itself is contained in the root key.
240pub(crate) const BLOB_KEY: &[u8] = &[42];
241
242/// The key used for blob states. The Blob ID itself is contained in the root key.
243pub(crate) const BLOB_STATE_KEY: &[u8] = &[49];
244
245/// The key used for lite certificates. The cryptohash itself is contained in the root key.
246pub(crate) const LITE_CERTIFICATE_KEY: &[u8] = &[91];
247
248/// The key used for confirmed blocks. The cryptohash itself is contained in the root key.
249pub(crate) const BLOCK_KEY: &[u8] = &[221];
250
251/// The key used for the network description.
252pub(crate) const NETWORK_DESCRIPTION_KEY: &[u8] = &[119];
253
254fn get_block_keys() -> Vec<Vec<u8>> {
255    vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
256}
257
258#[derive(Default)]
259#[allow(clippy::type_complexity)]
260pub(crate) struct MultiPartitionBatch {
261    keys_value_bytes: Vec<(Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>)>,
262}
263
264impl MultiPartitionBatch {
265    pub(crate) fn new() -> Self {
266        Self::default()
267    }
268
269    pub(crate) fn put_key_values(
270        &mut self,
271        root_key: Vec<u8>,
272        key_values: Vec<(Vec<u8>, Vec<u8>)>,
273    ) {
274        self.keys_value_bytes.push((root_key, key_values));
275    }
276
277    pub(crate) fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
278        self.put_key_values(root_key, vec![(key, value)]);
279    }
280
281    fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
282        #[cfg(with_metrics)]
283        metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
284        let root_key = RootKey::Blob(blob.id()).bytes();
285        let key = BLOB_KEY.to_vec();
286        self.put_key_value(root_key, key, blob.bytes().to_vec());
287        Ok(())
288    }
289
290    fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
291        let root_key = RootKey::Blob(blob_id).bytes();
292        let key = BLOB_STATE_KEY.to_vec();
293        let value = bcs::to_bytes(blob_state)?;
294        self.put_key_value(root_key, key, value);
295        Ok(())
296    }
297
298    /// Adds a certificate to the batch.
299    ///
300    /// Writes both the certificate data (indexed by hash) and a height index
301    /// (mapping chain_id + height to hash).
302    ///
303    /// Note: If called multiple times with the same `(chain_id, height)`, the height
304    /// index will be overwritten. The caller is responsible for ensuring that
305    /// certificates at the same height have the same hash.
306    fn add_certificate(
307        &mut self,
308        certificate: &ConfirmedBlockCertificate,
309    ) -> Result<(), ViewError> {
310        #[cfg(with_metrics)]
311        metrics::WRITE_CERTIFICATE_COUNTER
312            .with_label_values(&[])
313            .inc();
314        let hash = certificate.hash();
315
316        // Write certificate data by hash
317        let root_key = RootKey::ConfirmedBlock(hash).bytes();
318        let mut key_values = Vec::new();
319        let key = LITE_CERTIFICATE_KEY.to_vec();
320        let value = bcs::to_bytes(&certificate.lite_certificate())?;
321        key_values.push((key, value));
322        let key = BLOCK_KEY.to_vec();
323        let value = bcs::to_bytes(&certificate.value())?;
324        key_values.push((key, value));
325        self.put_key_values(root_key, key_values);
326
327        // Write height index: chain_id -> height -> hash
328        let chain_id = certificate.value().block().header.chain_id;
329        let height = certificate.value().block().header.height;
330        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
331        let height_key = to_height_key(height);
332        let index_value = bcs::to_bytes(&hash)?;
333        self.put_key_value(index_root_key, height_key, index_value);
334
335        Ok(())
336    }
337
338    fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
339        #[cfg(with_metrics)]
340        metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
341        let key = to_event_key(&event_id);
342        let root_key = RootKey::Event(event_id.chain_id).bytes();
343        self.put_key_value(root_key, key, value);
344        Ok(())
345    }
346
347    fn add_network_description(
348        &mut self,
349        information: &NetworkDescription,
350    ) -> Result<(), ViewError> {
351        #[cfg(with_metrics)]
352        metrics::WRITE_NETWORK_DESCRIPTION
353            .with_label_values(&[])
354            .inc();
355        let root_key = RootKey::NetworkDescription.bytes();
356        let key = NETWORK_DESCRIPTION_KEY.to_vec();
357        let value = bcs::to_bytes(information)?;
358        self.put_key_value(root_key, key, value);
359        Ok(())
360    }
361}
362
363/// Main implementation of the [`Storage`] trait.
364#[derive(Clone)]
365pub struct DbStorage<Database, Clock = WallClock> {
366    pub(crate) database: Arc<Database>,
367    clock: Clock,
368    thread_pool: Arc<linera_execution::ThreadPool>,
369    wasm_runtime: Option<WasmRuntime>,
370    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
371    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
372    execution_runtime_config: ExecutionRuntimeConfig,
373}
374
375#[derive(Debug, Serialize, Deserialize)]
376pub(crate) enum RootKey {
377    ChainState(ChainId),
378    ConfirmedBlock(CryptoHash),
379    Blob(BlobId),
380    Event(ChainId),
381    Placeholder,
382    NetworkDescription,
383    BlockExporterState(u32),
384    BlockByHeight(ChainId),
385}
386
387const CHAIN_ID_TAG: u8 = 0;
388const BLOB_ID_TAG: u8 = 2;
389
390impl RootKey {
391    pub(crate) fn bytes(&self) -> Vec<u8> {
392        bcs::to_bytes(self).unwrap()
393    }
394}
395
396#[derive(Debug, Serialize, Deserialize)]
397pub(crate) struct RestrictedEventId {
398    pub stream_id: StreamId,
399    pub index: u32,
400}
401
402pub(crate) fn to_event_key(event_id: &EventId) -> Vec<u8> {
403    let restricted_event_id = RestrictedEventId {
404        stream_id: event_id.stream_id.clone(),
405        index: event_id.index,
406    };
407    bcs::to_bytes(&restricted_event_id).unwrap()
408}
409
410pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
411    bcs::to_bytes(&height).unwrap()
412}
413
414fn is_chain_state(root_key: &[u8]) -> bool {
415    if root_key.is_empty() {
416        return false;
417    }
418    root_key[0] == CHAIN_ID_TAG
419}
420
421#[cfg(test)]
422mod tests {
423    use linera_base::{
424        crypto::{CryptoHash, TestString},
425        data_types::{BlockHeight, Epoch, Round, Timestamp},
426        identifiers::{
427            ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
428            StreamName,
429        },
430    };
431    use linera_chain::{
432        block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
433        types::ConfirmedBlockCertificate,
434    };
435    use linera_views::{
436        memory::MemoryDatabase,
437        store::{KeyValueDatabase, ReadableKeyValueStore as _},
438    };
439
440    use crate::{
441        db_storage::{
442            to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
443        },
444        DbStorage, Storage, TestClock,
445    };
446
447    // Several functionalities of the storage rely on the way that the serialization
448    // is done. Thus we need to check that the serialization works in the way that
449    // we expect.
450
451    // The listing of the blobs in `list_blob_ids` depends on the serialization
452    // of `RootKey::Blob`.
453    #[test]
454    fn test_root_key_blob_serialization() {
455        let hash = CryptoHash::default();
456        let blob_type = BlobType::default();
457        let blob_id = BlobId::new(hash, blob_type);
458        let root_key = RootKey::Blob(blob_id).bytes();
459        assert_eq!(root_key[0], BLOB_ID_TAG);
460        assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
461    }
462
463    // The listing of the chains in `list_chain_ids` depends on the serialization
464    // of `RootKey::ChainState`.
465    #[test]
466    fn test_root_key_chainstate_serialization() {
467        let hash = CryptoHash::default();
468        let chain_id = ChainId(hash);
469        let root_key = RootKey::ChainState(chain_id).bytes();
470        assert_eq!(root_key[0], CHAIN_ID_TAG);
471        assert_eq!(
472            bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
473            chain_id
474        );
475    }
476
477    // The listing of the events in `read_events_from_index` depends on the
478    // serialization of `RootKey::Event`.
479    #[test]
480    fn test_root_key_event_serialization() {
481        let hash = CryptoHash::test_hash("49");
482        let chain_id = ChainId(hash);
483        let application_description_hash = CryptoHash::test_hash("42");
484        let application_id = ApplicationId::new(application_description_hash);
485        let application_id = GenericApplicationId::User(application_id);
486        let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
487        let stream_id = StreamId {
488            application_id,
489            stream_name,
490        };
491        let prefix = bcs::to_bytes(&stream_id).unwrap();
492
493        let index = 1567;
494        let event_id = EventId {
495            chain_id,
496            stream_id,
497            index,
498        };
499        let key = to_event_key(&event_id);
500        assert!(key.starts_with(&prefix));
501    }
502
503    // The height index lookup depends on the serialization of RootKey::BlockByHeight
504    // and to_height_key, following the same pattern as Event.
505    #[test]
506    fn test_root_key_block_by_height_serialization() {
507        use linera_base::data_types::BlockHeight;
508
509        let hash = CryptoHash::default();
510        let chain_id = ChainId(hash);
511        let height = BlockHeight(42);
512
513        // RootKey::BlockByHeight uses only ChainId for partitioning (like Event)
514        let root_key = RootKey::BlockByHeight(chain_id).bytes();
515        let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
516        assert_eq!(deserialized_chain_id, chain_id);
517
518        // Height is encoded as a key (like index in Event)
519        let height_key = to_height_key(height);
520        let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
521        assert_eq!(deserialized_height, height);
522    }
523
524    #[cfg(with_testing)]
525    #[tokio::test]
526    async fn test_add_certificate_creates_height_index() {
527        // Create test storage
528        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
529
530        // Create a test certificate at a specific height
531        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
532        let height = BlockHeight(5);
533        let block = Block {
534            header: BlockHeader {
535                chain_id,
536                epoch: Epoch::ZERO,
537                height,
538                timestamp: Timestamp::from(0),
539                state_hash: CryptoHash::new(&TestString::new("state_hash")),
540                previous_block_hash: None,
541                authenticated_signer: None,
542                transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
543                messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
544                previous_message_blocks_hash: CryptoHash::new(&TestString::new(
545                    "prev_msg_blocks_hash",
546                )),
547                previous_event_blocks_hash: CryptoHash::new(&TestString::new(
548                    "prev_event_blocks_hash",
549                )),
550                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
551                events_hash: CryptoHash::new(&TestString::new("events_hash")),
552                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
553                operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
554            },
555            body: BlockBody {
556                transactions: vec![],
557                messages: vec![],
558                previous_message_blocks: Default::default(),
559                previous_event_blocks: Default::default(),
560                oracle_responses: vec![],
561                events: vec![],
562                blobs: vec![],
563                operation_results: vec![],
564            },
565        };
566        let confirmed_block = ConfirmedBlock::new(block);
567        let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
568
569        // Write certificate
570        let mut batch = MultiPartitionBatch::new();
571        batch.add_certificate(&certificate).unwrap();
572        storage.write_batch(batch).await.unwrap();
573
574        // Verify height index was created (following Event pattern)
575        let hash = certificate.hash();
576        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
577        let store = storage.database.open_shared(&index_root_key).unwrap();
578        let height_key = to_height_key(height);
579        let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
580
581        assert!(value_bytes.is_some(), "Height index was not created");
582        let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
583        assert_eq!(stored_hash, hash, "Height index contains wrong hash");
584    }
585
586    #[cfg(with_testing)]
587    #[tokio::test]
588    async fn test_read_certificates_by_heights() {
589        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
590        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
591
592        // Write certificates at heights 1, 3, 5
593        let mut batch = MultiPartitionBatch::new();
594        let mut expected_certs = vec![];
595
596        for height in [1, 3, 5] {
597            let block = Block {
598                header: BlockHeader {
599                    chain_id,
600                    epoch: Epoch::ZERO,
601                    height: BlockHeight(height),
602                    timestamp: Timestamp::from(0),
603                    state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
604                    previous_block_hash: None,
605                    authenticated_signer: None,
606                    transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
607                    messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
608                    previous_message_blocks_hash: CryptoHash::new(&TestString::new(
609                        "pmb_hash_{height}",
610                    )),
611                    previous_event_blocks_hash: CryptoHash::new(&TestString::new(
612                        "peb_hash_{height}",
613                    )),
614                    oracle_responses_hash: CryptoHash::new(&TestString::new(
615                        "oracle_hash_{height}",
616                    )),
617                    events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
618                    blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
619                    operation_results_hash: CryptoHash::new(&TestString::new(
620                        "op_results_hash_{height}",
621                    )),
622                },
623                body: BlockBody {
624                    transactions: vec![],
625                    messages: vec![],
626                    previous_message_blocks: Default::default(),
627                    previous_event_blocks: Default::default(),
628                    oracle_responses: vec![],
629                    events: vec![],
630                    blobs: vec![],
631                    operation_results: vec![],
632                },
633            };
634            let confirmed_block = ConfirmedBlock::new(block);
635            let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
636            expected_certs.push((height, cert.clone()));
637            batch.add_certificate(&cert).unwrap();
638        }
639        storage.write_batch(batch).await.unwrap();
640
641        // Test: Read in order [1, 3, 5]
642        let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
643        let result = storage
644            .read_certificates_by_heights(chain_id, &heights)
645            .await
646            .unwrap();
647        assert_eq!(result.len(), 3);
648        assert_eq!(
649            result[0].as_ref().unwrap().hash(),
650            expected_certs[0].1.hash()
651        );
652        assert_eq!(
653            result[1].as_ref().unwrap().hash(),
654            expected_certs[1].1.hash()
655        );
656        assert_eq!(
657            result[2].as_ref().unwrap().hash(),
658            expected_certs[2].1.hash()
659        );
660
661        // Test: Read out of order [5, 1, 3]
662        let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
663        let result = storage
664            .read_certificates_by_heights(chain_id, &heights)
665            .await
666            .unwrap();
667        assert_eq!(result.len(), 3);
668        assert_eq!(
669            result[0].as_ref().unwrap().hash(),
670            expected_certs[2].1.hash()
671        );
672        assert_eq!(
673            result[1].as_ref().unwrap().hash(),
674            expected_certs[0].1.hash()
675        );
676        assert_eq!(
677            result[2].as_ref().unwrap().hash(),
678            expected_certs[1].1.hash()
679        );
680
681        // Test: Read with missing heights [1, 2, 3]
682        let heights = vec![
683            BlockHeight(1),
684            BlockHeight(2),
685            BlockHeight(3),
686            BlockHeight(3),
687        ];
688        let result = storage
689            .read_certificates_by_heights(chain_id, &heights)
690            .await
691            .unwrap();
692        assert_eq!(result.len(), 4); // BlockHeight(3) was duplicated.
693        assert!(result[0].is_some());
694        assert!(result[1].is_none()); // Height 2 doesn't exist
695        assert!(result[2].is_some());
696        assert_eq!(
697            result[2].as_ref().unwrap().hash(),
698            result[3].as_ref().unwrap().hash()
699        ); // Both correspond to height 3
700
701        // Test: Empty heights
702        let heights = vec![];
703        let result = storage
704            .read_certificates_by_heights(chain_id, &heights)
705            .await
706            .unwrap();
707        assert_eq!(result.len(), 0);
708    }
709
710    #[cfg(with_testing)]
711    #[tokio::test]
712    async fn test_read_certificates_by_heights_multiple_chains() {
713        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
714
715        // Create certificates for two different chains at same heights
716        let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
717        let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
718
719        let mut batch = MultiPartitionBatch::new();
720
721        let block_a = Block {
722            header: BlockHeader {
723                chain_id: chain_a,
724                epoch: Epoch::ZERO,
725                height: BlockHeight(10),
726                timestamp: Timestamp::from(0),
727                state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
728                previous_block_hash: None,
729                authenticated_signer: None,
730                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
731                messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
732                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
733                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
734                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
735                events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
736                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
737                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
738            },
739            body: BlockBody {
740                transactions: vec![],
741                messages: vec![],
742                previous_message_blocks: Default::default(),
743                previous_event_blocks: Default::default(),
744                oracle_responses: vec![],
745                events: vec![],
746                blobs: vec![],
747                operation_results: vec![],
748            },
749        };
750        let confirmed_block_a = ConfirmedBlock::new(block_a);
751        let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
752        batch.add_certificate(&cert_a).unwrap();
753
754        let block_b = Block {
755            header: BlockHeader {
756                chain_id: chain_b,
757                epoch: Epoch::ZERO,
758                height: BlockHeight(10),
759                timestamp: Timestamp::from(0),
760                state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
761                previous_block_hash: None,
762                authenticated_signer: None,
763                transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
764                messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
765                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
766                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
767                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
768                events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
769                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
770                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
771            },
772            body: BlockBody {
773                transactions: vec![],
774                messages: vec![],
775                previous_message_blocks: Default::default(),
776                previous_event_blocks: Default::default(),
777                oracle_responses: vec![],
778                events: vec![],
779                blobs: vec![],
780                operation_results: vec![],
781            },
782        };
783        let confirmed_block_b = ConfirmedBlock::new(block_b);
784        let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
785        batch.add_certificate(&cert_b).unwrap();
786
787        storage.write_batch(batch).await.unwrap();
788
789        // Read from chain A - should get cert A
790        let result = storage
791            .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
792            .await
793            .unwrap();
794        assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
795
796        // Read from chain B - should get cert B
797        let result = storage
798            .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
799            .await
800            .unwrap();
801        assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
802
803        // Read from chain A for height that only chain B has - should get None
804        let result = storage
805            .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
806            .await
807            .unwrap();
808        assert!(result[0].is_none());
809    }
810
811    #[cfg(with_testing)]
812    #[tokio::test]
813    async fn test_read_certificates_by_heights_consistency() {
814        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
815        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
816
817        // Write certificate
818        let mut batch = MultiPartitionBatch::new();
819        let block = Block {
820            header: BlockHeader {
821                chain_id,
822                epoch: Epoch::ZERO,
823                height: BlockHeight(7),
824                timestamp: Timestamp::from(0),
825                state_hash: CryptoHash::new(&TestString::new("state_hash")),
826                previous_block_hash: None,
827                authenticated_signer: None,
828                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
829                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
830                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
831                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
832                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
833                events_hash: CryptoHash::new(&TestString::new("events_hash")),
834                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
835                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
836            },
837            body: BlockBody {
838                transactions: vec![],
839                messages: vec![],
840                previous_message_blocks: Default::default(),
841                previous_event_blocks: Default::default(),
842                oracle_responses: vec![],
843                events: vec![],
844                blobs: vec![],
845                operation_results: vec![],
846            },
847        };
848        let confirmed_block = ConfirmedBlock::new(block);
849        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
850        let hash = cert.hash();
851        batch.add_certificate(&cert).unwrap();
852        storage.write_batch(batch).await.unwrap();
853
854        // Read by hash
855        let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
856
857        // Read by height
858        let certs_by_height = storage
859            .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
860            .await
861            .unwrap();
862        let cert_by_height = certs_by_height[0].as_ref().unwrap();
863
864        // Should be identical
865        assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
866        assert_eq!(
867            cert_by_hash.value().block().header,
868            cert_by_height.value().block().header
869        );
870    }
871
872    /// Tests that `write_certificate_height_indices` correctly populates the reverse index
873    /// so that `read_certificates_by_heights` can find certificates that were written
874    /// without the height index (simulating old data or fallback scenarios).
875    #[cfg(with_testing)]
876    #[tokio::test]
877    async fn test_write_certificate_height_indices_populates_reverse_index() {
878        use linera_views::{batch::Batch, store::WritableKeyValueStore as _};
879
880        let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
881        let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
882
883        // Create a certificate but write ONLY the certificate data, NOT the height index.
884        // This simulates old data written before the height index feature existed.
885        let block = Block {
886            header: BlockHeader {
887                chain_id,
888                epoch: Epoch::ZERO,
889                height: BlockHeight(10),
890                timestamp: Timestamp::from(0),
891                state_hash: CryptoHash::new(&TestString::new("state_hash")),
892                previous_block_hash: None,
893                authenticated_signer: None,
894                transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
895                messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
896                previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
897                previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
898                oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
899                events_hash: CryptoHash::new(&TestString::new("events_hash")),
900                blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
901                operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
902            },
903            body: BlockBody {
904                transactions: vec![],
905                messages: vec![],
906                previous_message_blocks: Default::default(),
907                previous_event_blocks: Default::default(),
908                oracle_responses: vec![],
909                events: vec![],
910                blobs: vec![],
911                operation_results: vec![],
912            },
913        };
914        let confirmed_block = ConfirmedBlock::new(block);
915        let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
916        let hash = cert.hash();
917        let height = BlockHeight(10);
918
919        // Write certificate data directly (bypassing add_certificate which writes the index)
920        let root_key = RootKey::ConfirmedBlock(hash).bytes();
921        let store = storage.database.open_shared(&root_key).unwrap();
922        let mut batch = Batch::new();
923        batch.put_key_value_bytes(
924            crate::db_storage::LITE_CERTIFICATE_KEY.to_vec(),
925            bcs::to_bytes(&cert.lite_certificate()).unwrap(),
926        );
927        batch.put_key_value_bytes(
928            crate::db_storage::BLOCK_KEY.to_vec(),
929            bcs::to_bytes(&cert.value()).unwrap(),
930        );
931        store.write_batch(batch).await.unwrap();
932
933        // Verify height index does NOT exist yet
934        let result = storage
935            .read_certificates_by_heights(chain_id, &[height])
936            .await
937            .unwrap();
938        assert!(
939            result[0].is_none(),
940            "Height index should not exist before write_certificate_height_indices"
941        );
942
943        // Verify certificate can still be read by hash
944        let cert_by_hash = storage.read_certificate(hash).await.unwrap();
945        assert!(cert_by_hash.is_some(), "Certificate should exist by hash");
946
947        // Write the height index (simulating what updater does after fallback)
948        storage
949            .write_certificate_height_indices(chain_id, &[(height, hash)])
950            .await
951            .unwrap();
952
953        // Verify read_certificates_by_heights NOW returns the certificate
954        let result = storage
955            .read_certificates_by_heights(chain_id, &[height])
956            .await
957            .unwrap();
958        assert!(
959            result[0].is_some(),
960            "Height index should exist after write_certificate_height_indices"
961        );
962        assert_eq!(
963            result[0].as_ref().unwrap().hash(),
964            hash,
965            "Certificate retrieved by height should match original"
966        );
967    }
968}
969
970/// An implementation of [`DualStoreRootKeyAssignment`] that stores the
971/// chain states into the first store.
972#[derive(Clone, Copy)]
973pub struct ChainStatesFirstAssignment;
974
975impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
976    fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
977        if root_key.is_empty() {
978            return Ok(StoreInUse::Second);
979        }
980        let store = match is_chain_state(root_key) {
981            true => StoreInUse::First,
982            false => StoreInUse::Second,
983        };
984        Ok(store)
985    }
986}
987
988/// A `Clock` implementation using the system clock.
989#[derive(Clone)]
990pub struct WallClock;
991
992#[cfg_attr(not(web), async_trait)]
993#[cfg_attr(web, async_trait(?Send))]
994impl Clock for WallClock {
995    fn current_time(&self) -> Timestamp {
996        Timestamp::now()
997    }
998
999    async fn sleep(&self, delta: TimeDelta) {
1000        linera_base::time::timer::sleep(delta.as_duration()).await
1001    }
1002
1003    async fn sleep_until(&self, timestamp: Timestamp) {
1004        let delta = timestamp.delta_since(Timestamp::now());
1005        if delta > TimeDelta::ZERO {
1006            self.sleep(delta).await
1007        }
1008    }
1009}
1010
1011#[cfg(with_testing)]
1012#[derive(Default)]
1013struct TestClockInner {
1014    time: Timestamp,
1015    sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
1016    /// Optional callback that decides whether to auto-advance for a given target timestamp.
1017    /// Returns `true` if the clock should auto-advance to that time.
1018    sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
1019}
1020
1021#[cfg(with_testing)]
1022impl TestClockInner {
1023    fn set(&mut self, time: Timestamp) {
1024        self.time = time;
1025        let senders = self.sleeps.split_off(&Reverse(time));
1026        for sender in senders.into_values().flatten() {
1027            // Receiver may have been dropped if the sleep was cancelled.
1028            sender.send(()).ok();
1029        }
1030    }
1031
1032    fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
1033        let target_time = self.time.saturating_add(delta);
1034        self.add_sleep_until(target_time)
1035    }
1036
1037    fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
1038        let (sender, receiver) = oneshot::channel();
1039        let should_auto_advance = self
1040            .sleep_callback
1041            .as_ref()
1042            .is_some_and(|callback| callback(time));
1043        if should_auto_advance && time > self.time {
1044            // Auto-advance mode: immediately advance the clock and complete the sleep.
1045            self.set(time);
1046            // Receiver may have been dropped if the sleep was cancelled.
1047            sender.send(()).ok();
1048        } else if self.time >= time {
1049            // Receiver may have been dropped if the sleep was cancelled.
1050            sender.send(()).ok();
1051        } else {
1052            self.sleeps.entry(Reverse(time)).or_default().push(sender);
1053        }
1054        receiver
1055    }
1056}
1057
1058/// A clock implementation that uses a stored number of microseconds and that can be updated
1059/// explicitly. All clones share the same time, and setting it in one clone updates all the others.
1060#[cfg(with_testing)]
1061#[derive(Clone, Default)]
1062pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
1063
1064#[cfg(with_testing)]
1065#[cfg_attr(not(web), async_trait)]
1066#[cfg_attr(web, async_trait(?Send))]
1067impl Clock for TestClock {
1068    fn current_time(&self) -> Timestamp {
1069        self.lock().time
1070    }
1071
1072    async fn sleep(&self, delta: TimeDelta) {
1073        if delta == TimeDelta::ZERO {
1074            return;
1075        }
1076        let receiver = self.lock().add_sleep(delta);
1077        // Sender may have been dropped if the clock was dropped; just stop waiting.
1078        receiver.await.ok();
1079    }
1080
1081    async fn sleep_until(&self, timestamp: Timestamp) {
1082        let receiver = self.lock().add_sleep_until(timestamp);
1083        // Sender may have been dropped if the clock was dropped; just stop waiting.
1084        receiver.await.ok();
1085    }
1086}
1087
1088#[cfg(with_testing)]
1089impl TestClock {
1090    /// Creates a new clock with its time set to 0, i.e. the Unix epoch.
1091    pub fn new() -> Self {
1092        TestClock(Arc::default())
1093    }
1094
1095    /// Sets the current time.
1096    pub fn set(&self, time: Timestamp) {
1097        self.lock().set(time);
1098    }
1099
1100    /// Advances the current time by the specified delta.
1101    pub fn add(&self, delta: TimeDelta) {
1102        let mut guard = self.lock();
1103        let time = guard.time.saturating_add(delta);
1104        guard.set(time);
1105    }
1106
1107    /// Returns the current time according to the test clock.
1108    pub fn current_time(&self) -> Timestamp {
1109        self.lock().time
1110    }
1111
1112    /// Sets a callback that decides whether to auto-advance for each sleep call.
1113    ///
1114    /// The callback receives the target timestamp and should return `true` if the clock
1115    /// should auto-advance to that time, or `false` if the sleep should block normally.
1116    pub fn set_sleep_callback<F>(&self, callback: F)
1117    where
1118        F: Fn(Timestamp) -> bool + Send + Sync + 'static,
1119    {
1120        self.lock().sleep_callback = Some(Box::new(callback));
1121    }
1122
1123    /// Clears the sleep callback.
1124    pub fn clear_sleep_callback(&self) {
1125        self.lock().sleep_callback = None;
1126    }
1127
1128    fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
1129        self.0.lock().expect("poisoned TestClock mutex")
1130    }
1131}
1132
1133#[cfg_attr(not(web), async_trait)]
1134#[cfg_attr(web, async_trait(?Send))]
1135impl<Database, C> Storage for DbStorage<Database, C>
1136where
1137    Database: KeyValueDatabase<
1138            Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
1139            Error: Send + Sync,
1140        > + Clone
1141        + linera_base::util::traits::AutoTraits
1142        + 'static,
1143    C: Clock + Clone + Send + Sync + 'static,
1144{
1145    type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
1146    type Clock = C;
1147    type BlockExporterContext = ViewContext<u32, Database::Store>;
1148
1149    fn clock(&self) -> &C {
1150        &self.clock
1151    }
1152
1153    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
1154        &self.thread_pool
1155    }
1156
1157    #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
1158    async fn load_chain(
1159        &self,
1160        chain_id: ChainId,
1161    ) -> Result<ChainStateView<Self::Context>, ViewError> {
1162        #[cfg(with_metrics)]
1163        let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
1164        let runtime_context = ChainRuntimeContext {
1165            storage: self.clone(),
1166            thread_pool: self.thread_pool.clone(),
1167            chain_id,
1168            execution_runtime_config: self.execution_runtime_config,
1169            user_contracts: self.user_contracts.clone(),
1170            user_services: self.user_services.clone(),
1171        };
1172        let root_key = RootKey::ChainState(chain_id).bytes();
1173        let store = self.database.open_exclusive(&root_key)?;
1174        let context = ViewContext::create_root_context(store, runtime_context).await?;
1175        ChainStateView::load(context).await
1176    }
1177
1178    #[instrument(level = "trace", skip_all, fields(%blob_id))]
1179    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1180        let root_key = RootKey::Blob(blob_id).bytes();
1181        let store = self.database.open_shared(&root_key)?;
1182        let test = store.contains_key(BLOB_KEY).await?;
1183        #[cfg(with_metrics)]
1184        metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
1185        Ok(test)
1186    }
1187
1188    #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
1189    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
1190        let mut missing_blobs = Vec::new();
1191        for blob_id in blob_ids {
1192            let root_key = RootKey::Blob(*blob_id).bytes();
1193            let store = self.database.open_shared(&root_key)?;
1194            if !store.contains_key(BLOB_KEY).await? {
1195                missing_blobs.push(*blob_id);
1196            }
1197        }
1198        #[cfg(with_metrics)]
1199        metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
1200        Ok(missing_blobs)
1201    }
1202
1203    #[instrument(skip_all, fields(%blob_id))]
1204    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1205        let root_key = RootKey::Blob(blob_id).bytes();
1206        let store = self.database.open_shared(&root_key)?;
1207        let test = store.contains_key(BLOB_STATE_KEY).await?;
1208        #[cfg(with_metrics)]
1209        metrics::CONTAINS_BLOB_STATE_COUNTER
1210            .with_label_values(&[])
1211            .inc();
1212        Ok(test)
1213    }
1214
1215    #[instrument(skip_all, fields(%hash))]
1216    async fn read_confirmed_block(
1217        &self,
1218        hash: CryptoHash,
1219    ) -> Result<Option<ConfirmedBlock>, ViewError> {
1220        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1221        let store = self.database.open_shared(&root_key)?;
1222        let value = store.read_value(BLOCK_KEY).await?;
1223        #[cfg(with_metrics)]
1224        metrics::READ_CONFIRMED_BLOCK_COUNTER
1225            .with_label_values(&[])
1226            .inc();
1227        Ok(value)
1228    }
1229
1230    #[instrument(skip_all)]
1231    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
1232        &self,
1233        hashes: I,
1234    ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
1235        let hashes = hashes.into_iter().collect::<Vec<_>>();
1236        if hashes.is_empty() {
1237            return Ok(Vec::new());
1238        }
1239        let root_keys = Self::get_root_keys_for_certificates(&hashes);
1240        let mut blocks = Vec::new();
1241        for root_key in root_keys {
1242            let store = self.database.open_shared(&root_key)?;
1243            blocks.push(store.read_value(BLOCK_KEY).await?);
1244        }
1245        #[cfg(with_metrics)]
1246        metrics::READ_CONFIRMED_BLOCKS_COUNTER
1247            .with_label_values(&[])
1248            .inc_by(hashes.len() as u64);
1249        Ok(blocks)
1250    }
1251
1252    #[instrument(skip_all, fields(%blob_id))]
1253    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
1254        let root_key = RootKey::Blob(blob_id).bytes();
1255        let store = self.database.open_shared(&root_key)?;
1256        let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
1257        #[cfg(with_metrics)]
1258        metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
1259        Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
1260    }
1261
1262    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1263    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
1264        if blob_ids.is_empty() {
1265            return Ok(Vec::new());
1266        }
1267        let mut blobs = Vec::new();
1268        for blob_id in blob_ids {
1269            blobs.push(self.read_blob(*blob_id).await?);
1270        }
1271        #[cfg(with_metrics)]
1272        metrics::READ_BLOB_COUNTER
1273            .with_label_values(&[])
1274            .inc_by(blob_ids.len() as u64);
1275        Ok(blobs)
1276    }
1277
1278    #[instrument(skip_all, fields(%blob_id))]
1279    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
1280        let root_key = RootKey::Blob(blob_id).bytes();
1281        let store = self.database.open_shared(&root_key)?;
1282        let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1283        #[cfg(with_metrics)]
1284        metrics::READ_BLOB_STATE_COUNTER
1285            .with_label_values(&[])
1286            .inc();
1287        Ok(blob_state)
1288    }
1289
1290    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1291    async fn read_blob_states(
1292        &self,
1293        blob_ids: &[BlobId],
1294    ) -> Result<Vec<Option<BlobState>>, ViewError> {
1295        if blob_ids.is_empty() {
1296            return Ok(Vec::new());
1297        }
1298        let mut blob_states = Vec::new();
1299        for blob_id in blob_ids {
1300            blob_states.push(self.read_blob_state(*blob_id).await?);
1301        }
1302        #[cfg(with_metrics)]
1303        metrics::READ_BLOB_STATES_COUNTER
1304            .with_label_values(&[])
1305            .inc_by(blob_ids.len() as u64);
1306        Ok(blob_states)
1307    }
1308
1309    #[instrument(skip_all, fields(blob_id = %blob.id()))]
1310    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1311        let mut batch = MultiPartitionBatch::new();
1312        batch.add_blob(blob)?;
1313        self.write_batch(batch).await?;
1314        Ok(())
1315    }
1316
1317    #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1318    async fn maybe_write_blob_states(
1319        &self,
1320        blob_ids: &[BlobId],
1321        blob_state: BlobState,
1322    ) -> Result<(), ViewError> {
1323        if blob_ids.is_empty() {
1324            return Ok(());
1325        }
1326        let mut maybe_blob_states = Vec::new();
1327        for blob_id in blob_ids {
1328            let root_key = RootKey::Blob(*blob_id).bytes();
1329            let store = self.database.open_shared(&root_key)?;
1330            let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1331            maybe_blob_states.push(maybe_blob_state);
1332        }
1333        let mut batch = MultiPartitionBatch::new();
1334        for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1335            match maybe_blob_state {
1336                None => {
1337                    batch.add_blob_state(*blob_id, &blob_state)?;
1338                }
1339                Some(state) => {
1340                    if state.epoch < blob_state.epoch {
1341                        batch.add_blob_state(*blob_id, &blob_state)?;
1342                    }
1343                }
1344            }
1345        }
1346        // We tolerate race conditions because two active chains are likely to
1347        // be both from the latest epoch, and otherwise failing to pick the
1348        // more recent blob state has limited impact.
1349        self.write_batch(batch).await?;
1350        Ok(())
1351    }
1352
1353    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1354    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1355        if blobs.is_empty() {
1356            return Ok(Vec::new());
1357        }
1358        let mut batch = MultiPartitionBatch::new();
1359        let mut blob_states = Vec::new();
1360        for blob in blobs {
1361            let root_key = RootKey::Blob(blob.id()).bytes();
1362            let store = self.database.open_shared(&root_key)?;
1363            let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1364            blob_states.push(has_state);
1365            if has_state {
1366                batch.add_blob(blob)?;
1367            }
1368        }
1369        self.write_batch(batch).await?;
1370        Ok(blob_states)
1371    }
1372
1373    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1374    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1375        if blobs.is_empty() {
1376            return Ok(());
1377        }
1378        let mut batch = MultiPartitionBatch::new();
1379        for blob in blobs {
1380            batch.add_blob(blob)?;
1381        }
1382        self.write_batch(batch).await
1383    }
1384
1385    #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1386    async fn write_blobs_and_certificate(
1387        &self,
1388        blobs: &[Blob],
1389        certificate: &ConfirmedBlockCertificate,
1390    ) -> Result<(), ViewError> {
1391        let mut batch = MultiPartitionBatch::new();
1392        for blob in blobs {
1393            batch.add_blob(blob)?;
1394        }
1395        batch.add_certificate(certificate)?;
1396        self.write_batch(batch).await
1397    }
1398
1399    #[instrument(skip_all, fields(%hash))]
1400    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1401        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1402        let store = self.database.open_shared(&root_key)?;
1403        let results = store.contains_keys(&get_block_keys()).await?;
1404        #[cfg(with_metrics)]
1405        metrics::CONTAINS_CERTIFICATE_COUNTER
1406            .with_label_values(&[])
1407            .inc();
1408        Ok(results[0] && results[1])
1409    }
1410
1411    #[instrument(skip_all, fields(%hash))]
1412    async fn read_certificate(
1413        &self,
1414        hash: CryptoHash,
1415    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1416        let root_key = RootKey::ConfirmedBlock(hash).bytes();
1417        let store = self.database.open_shared(&root_key)?;
1418        let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1419        #[cfg(with_metrics)]
1420        metrics::READ_CERTIFICATE_COUNTER
1421            .with_label_values(&[])
1422            .inc();
1423        Self::deserialize_certificate(&values, hash)
1424    }
1425
1426    #[instrument(skip_all)]
1427    async fn read_certificates(
1428        &self,
1429        hashes: &[CryptoHash],
1430    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1431        let raw_certs = self.read_certificates_raw(hashes).await?;
1432
1433        raw_certs
1434            .into_iter()
1435            .zip(hashes)
1436            .map(|(maybe_raw, hash)| {
1437                let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
1438                    return Ok(None);
1439                };
1440                let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1441                let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1442                assert_eq!(&value.hash(), hash);
1443                let certificate = cert
1444                    .with_value(value)
1445                    .ok_or(ViewError::InconsistentEntries)?;
1446                Ok(Some(certificate))
1447            })
1448            .collect()
1449    }
1450
1451    #[instrument(skip_all)]
1452    async fn read_certificates_raw(
1453        &self,
1454        hashes: &[CryptoHash],
1455    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
1456        if hashes.is_empty() {
1457            return Ok(Vec::new());
1458        }
1459        let root_keys = Self::get_root_keys_for_certificates(hashes);
1460        let mut values = Vec::new();
1461        for root_key in root_keys {
1462            let store = self.database.open_shared(&root_key)?;
1463            values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
1464        }
1465        #[cfg(with_metrics)]
1466        metrics::READ_CERTIFICATES_COUNTER
1467            .with_label_values(&[])
1468            .inc_by(hashes.len() as u64);
1469        Ok(values
1470            .chunks_exact(2)
1471            .map(|chunk| {
1472                let lite_cert_bytes = chunk[0].as_ref()?;
1473                let confirmed_block_bytes = chunk[1].as_ref()?;
1474                Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
1475            })
1476            .collect())
1477    }
1478
1479    async fn read_certificate_hashes_by_heights(
1480        &self,
1481        chain_id: ChainId,
1482        heights: &[BlockHeight],
1483    ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1484        if heights.is_empty() {
1485            return Ok(Vec::new());
1486        }
1487
1488        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1489        let store = self.database.open_shared(&index_root_key)?;
1490        let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1491        let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1492        let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1493            .into_iter()
1494            .map(|opt| {
1495                opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1496                    .transpose()
1497            })
1498            .collect::<Result<_, _>>()?;
1499
1500        Ok(hash_options)
1501    }
1502
1503    #[instrument(skip_all)]
1504    async fn read_certificates_by_heights_raw(
1505        &self,
1506        chain_id: ChainId,
1507        heights: &[BlockHeight],
1508    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
1509        let hashes: Vec<Option<CryptoHash>> = self
1510            .read_certificate_hashes_by_heights(chain_id, heights)
1511            .await?;
1512
1513        // Map from hash to all indices in the heights array (handles duplicates)
1514        let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1515        for (index, maybe_hash) in hashes.iter().enumerate() {
1516            if let Some(hash) = maybe_hash {
1517                indices.entry(*hash).or_default().push(index);
1518            }
1519        }
1520
1521        // Deduplicate hashes for the storage query
1522        let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1523
1524        let mut result = vec![None; heights.len()];
1525
1526        for (raw_cert, hash) in self
1527            .read_certificates_raw(&unique_hashes)
1528            .await?
1529            .into_iter()
1530            .zip(unique_hashes)
1531        {
1532            if let Some(idx_list) = indices.get(&hash) {
1533                for &index in idx_list {
1534                    result[index] = raw_cert.clone();
1535                }
1536            } else {
1537                // This should not happen, but log a warning if it does.
1538                tracing::warn!(
1539                    hash=?hash,
1540                    "certificate hash not found in indices map",
1541                );
1542            }
1543        }
1544
1545        Ok(result)
1546    }
1547
1548    #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1549    async fn read_certificates_by_heights(
1550        &self,
1551        chain_id: ChainId,
1552        heights: &[BlockHeight],
1553    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1554        self.read_certificates_by_heights_raw(chain_id, heights)
1555            .await?
1556            .into_iter()
1557            .map(|maybe_raw| match maybe_raw {
1558                None => Ok(None),
1559                Some((lite_cert_bytes, confirmed_block_bytes)) => {
1560                    let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1561                    let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1562                    let certificate = cert
1563                        .with_value(value)
1564                        .ok_or(ViewError::InconsistentEntries)?;
1565                    Ok(Some(certificate))
1566                }
1567            })
1568            .collect()
1569    }
1570
1571    #[instrument(skip_all, fields(%chain_id, indices_len = indices.len()))]
1572    async fn write_certificate_height_indices(
1573        &self,
1574        chain_id: ChainId,
1575        indices: &[(BlockHeight, CryptoHash)],
1576    ) -> Result<(), ViewError> {
1577        if indices.is_empty() {
1578            return Ok(());
1579        }
1580
1581        let mut batch = MultiPartitionBatch::new();
1582        let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1583        let key_values: Vec<(Vec<u8>, Vec<u8>)> = indices
1584            .iter()
1585            .map(|(height, hash)| {
1586                let height_key = to_height_key(*height);
1587                let hash_value = bcs::to_bytes(hash).unwrap();
1588                (height_key, hash_value)
1589            })
1590            .collect();
1591        batch.put_key_values(index_root_key, key_values);
1592        self.write_batch(batch).await
1593    }
1594
1595    #[instrument(skip_all, fields(event_id = ?event_id))]
1596    async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1597        let event_key = to_event_key(&event_id);
1598        let root_key = RootKey::Event(event_id.chain_id).bytes();
1599        let store = self.database.open_shared(&root_key)?;
1600        let event = store.read_value_bytes(&event_key).await?;
1601        #[cfg(with_metrics)]
1602        metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1603        Ok(event)
1604    }
1605
1606    #[instrument(skip_all, fields(event_id = ?event_id))]
1607    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1608        let event_key = to_event_key(&event_id);
1609        let root_key = RootKey::Event(event_id.chain_id).bytes();
1610        let store = self.database.open_shared(&root_key)?;
1611        let exists = store.contains_key(&event_key).await?;
1612        #[cfg(with_metrics)]
1613        metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1614        Ok(exists)
1615    }
1616
1617    #[instrument(skip_all, fields(%chain_id, %stream_id, %start_index))]
1618    async fn read_events_from_index(
1619        &self,
1620        chain_id: &ChainId,
1621        stream_id: &StreamId,
1622        start_index: u32,
1623    ) -> Result<Vec<IndexAndEvent>, ViewError> {
1624        let root_key = RootKey::Event(*chain_id).bytes();
1625        let store = self.database.open_shared(&root_key)?;
1626        let mut keys = Vec::new();
1627        let mut indices = Vec::new();
1628        let prefix = bcs::to_bytes(stream_id).unwrap();
1629        for short_key in store.find_keys_by_prefix(&prefix).await? {
1630            let index = bcs::from_bytes::<u32>(&short_key)?;
1631            if index >= start_index {
1632                let mut key = prefix.clone();
1633                key.extend(short_key);
1634                keys.push(key);
1635                indices.push(index);
1636            }
1637        }
1638        let values = store.read_multi_values_bytes(&keys).await?;
1639        let mut returned_values = Vec::new();
1640        for (index, value) in indices.into_iter().zip(values) {
1641            let event = value.unwrap();
1642            returned_values.push(IndexAndEvent { index, event });
1643        }
1644        Ok(returned_values)
1645    }
1646
1647    #[instrument(skip_all)]
1648    async fn write_events(
1649        &self,
1650        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1651    ) -> Result<(), ViewError> {
1652        let mut batch = MultiPartitionBatch::new();
1653        for (event_id, value) in events {
1654            batch.add_event(event_id, value)?;
1655        }
1656        self.write_batch(batch).await
1657    }
1658
1659    #[instrument(skip_all)]
1660    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1661        let root_key = RootKey::NetworkDescription.bytes();
1662        let store = self.database.open_shared(&root_key)?;
1663        let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1664        #[cfg(with_metrics)]
1665        metrics::READ_NETWORK_DESCRIPTION
1666            .with_label_values(&[])
1667            .inc();
1668        Ok(maybe_value)
1669    }
1670
1671    #[instrument(skip_all)]
1672    async fn write_network_description(
1673        &self,
1674        information: &NetworkDescription,
1675    ) -> Result<(), ViewError> {
1676        let mut batch = MultiPartitionBatch::new();
1677        batch.add_network_description(information)?;
1678        self.write_batch(batch).await?;
1679        Ok(())
1680    }
1681
1682    fn wasm_runtime(&self) -> Option<WasmRuntime> {
1683        self.wasm_runtime
1684    }
1685
1686    #[instrument(skip_all)]
1687    async fn block_exporter_context(
1688        &self,
1689        block_exporter_id: u32,
1690    ) -> Result<Self::BlockExporterContext, ViewError> {
1691        let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1692        let store = self.database.open_exclusive(&root_key)?;
1693        Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1694    }
1695}
1696
1697impl<Database, C> DbStorage<Database, C>
1698where
1699    Database: KeyValueDatabase + Clone,
1700    Database::Store: KeyValueStore + Clone,
1701    C: Clock,
1702    Database::Error: Send + Sync,
1703{
1704    #[instrument(skip_all)]
1705    fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1706        hashes
1707            .iter()
1708            .map(|hash| RootKey::ConfirmedBlock(*hash).bytes())
1709            .collect()
1710    }
1711
1712    #[instrument(skip_all)]
1713    fn deserialize_certificate(
1714        pair: &[Option<Vec<u8>>],
1715        hash: CryptoHash,
1716    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1717        let Some(cert_bytes) = pair[0].as_ref() else {
1718            return Ok(None);
1719        };
1720        let Some(value_bytes) = pair[1].as_ref() else {
1721            return Ok(None);
1722        };
1723        let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1724        let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1725        assert_eq!(value.hash(), hash);
1726        let certificate = cert
1727            .with_value(value)
1728            .ok_or(ViewError::InconsistentEntries)?;
1729        Ok(Some(certificate))
1730    }
1731
1732    #[instrument(skip_all)]
1733    async fn write_entry(
1734        store: &Database::Store,
1735        key_values: Vec<(Vec<u8>, Vec<u8>)>,
1736    ) -> Result<(), ViewError> {
1737        let mut batch = Batch::new();
1738        for (key, value) in key_values {
1739            batch.put_key_value_bytes(key, value);
1740        }
1741        store.write_batch(batch).await?;
1742        Ok(())
1743    }
1744
1745    #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1746    pub(crate) async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1747        if batch.keys_value_bytes.is_empty() {
1748            return Ok(());
1749        }
1750        let mut futures = Vec::new();
1751        for (root_key, key_values) in batch.keys_value_bytes {
1752            let store = self.database.open_shared(&root_key)?;
1753            futures.push(async move { Self::write_entry(&store, key_values).await });
1754        }
1755        futures::future::try_join_all(futures).await?;
1756        Ok(())
1757    }
1758}
1759
1760impl<Database, C> DbStorage<Database, C>
1761where
1762    Database: KeyValueDatabase + Clone + 'static,
1763    Database::Error: Send + Sync,
1764    Database::Store: KeyValueStore + Clone + 'static,
1765    C: Clock + Clone + Send + Sync + 'static,
1766{
1767    pub(crate) fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1768        Self {
1769            database: Arc::new(database),
1770            clock,
1771            // The `Arc` here is required on native but useless on the Web.
1772            #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1773            thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1774            wasm_runtime,
1775            user_contracts: Arc::new(papaya::HashMap::new()),
1776            user_services: Arc::new(papaya::HashMap::new()),
1777            execution_runtime_config: ExecutionRuntimeConfig::default(),
1778        }
1779    }
1780
1781    /// Sets whether contract log messages should be output.
1782    pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1783        self.execution_runtime_config.allow_application_logs = allow;
1784        self
1785    }
1786}
1787
1788impl<Database> DbStorage<Database, WallClock>
1789where
1790    Database: KeyValueDatabase + Clone + 'static,
1791    Database::Error: Send + Sync,
1792    Database::Store: KeyValueStore + Clone + 'static,
1793{
1794    pub async fn maybe_create_and_connect(
1795        config: &Database::Config,
1796        namespace: &str,
1797        wasm_runtime: Option<WasmRuntime>,
1798    ) -> Result<Self, ViewError> {
1799        let database = Database::maybe_create_and_connect(config, namespace).await?;
1800        let storage = Self::new(database, wasm_runtime, WallClock);
1801        Ok(storage)
1802    }
1803
1804    pub async fn connect(
1805        config: &Database::Config,
1806        namespace: &str,
1807        wasm_runtime: Option<WasmRuntime>,
1808    ) -> Result<Self, ViewError> {
1809        let database = Database::connect(config, namespace).await?;
1810        let storage = Self::new(database, wasm_runtime, WallClock);
1811        Ok(storage)
1812    }
1813
1814    /// Lists the blob IDs of the storage.
1815    pub async fn list_blob_ids(
1816        config: &Database::Config,
1817        namespace: &str,
1818    ) -> Result<Vec<BlobId>, ViewError> {
1819        let database = Database::connect(config, namespace).await?;
1820        let root_keys = database.list_root_keys().await?;
1821        let mut blob_ids = Vec::new();
1822        for root_key in root_keys {
1823            if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1824                let root_key_red = &root_key[1..];
1825                let blob_id = bcs::from_bytes(root_key_red)?;
1826                blob_ids.push(blob_id);
1827            }
1828        }
1829        Ok(blob_ids)
1830    }
1831}
1832
1833impl<Database> DbStorage<Database, WallClock>
1834where
1835    Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1836    Database::Error: Send + Sync,
1837{
1838    /// Lists the chain IDs of the storage.
1839    pub async fn list_chain_ids(
1840        config: &Database::Config,
1841        namespace: &str,
1842    ) -> Result<Vec<ChainId>, ViewError> {
1843        let database = Database::connect(config, namespace).await?;
1844        let root_keys = database.list_root_keys().await?;
1845        let mut chain_ids = Vec::new();
1846        for root_key in root_keys {
1847            if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1848                let root_key_red = &root_key[1..];
1849                let chain_id = bcs::from_bytes(root_key_red)?;
1850                chain_ids.push(chain_id);
1851            }
1852        }
1853        Ok(chain_ids)
1854    }
1855}
1856
1857#[cfg(with_testing)]
1858impl<Database> DbStorage<Database, TestClock>
1859where
1860    Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1861    Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1862    Database::Error: Send + Sync,
1863{
1864    pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1865        let config = Database::new_test_config().await.unwrap();
1866        let namespace = generate_test_namespace();
1867        DbStorage::<Database, TestClock>::new_for_testing(
1868            config,
1869            &namespace,
1870            wasm_runtime,
1871            TestClock::new(),
1872        )
1873        .await
1874        .unwrap()
1875    }
1876
1877    pub async fn new_for_testing(
1878        config: Database::Config,
1879        namespace: &str,
1880        wasm_runtime: Option<WasmRuntime>,
1881        clock: TestClock,
1882    ) -> Result<Self, ViewError> {
1883        let database = Database::recreate_and_connect(&config, namespace).await?;
1884        let storage = Self::new(database, wasm_runtime, clock);
1885        storage.assert_is_migrated_storage().await?;
1886        Ok(storage)
1887    }
1888}