1use std::{collections::HashMap, fmt::Debug, sync::Arc};
5
6use async_trait::async_trait;
7#[cfg(with_metrics)]
8use linera_base::prometheus_util::MeasureLatency as _;
9use linera_base::{
10 crypto::CryptoHash,
11 data_types::{Blob, BlockHeight, NetworkDescription, TimeDelta, Timestamp},
12 identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
13};
14use linera_chain::{
15 types::{CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, LiteCertificate},
16 ChainStateView,
17};
18use linera_execution::{
19 BlobState, ExecutionRuntimeConfig, UserContractCode, UserServiceCode, WasmRuntime,
20};
21use linera_views::{
22 backends::dual::{DualStoreRootKeyAssignment, StoreInUse},
23 batch::Batch,
24 context::ViewContext,
25 store::{
26 KeyValueDatabase, KeyValueStore, ReadableKeyValueStore as _, WritableKeyValueStore as _,
27 },
28 views::View,
29 ViewError,
30};
31use serde::{Deserialize, Serialize};
32use tracing::instrument;
33#[cfg(with_testing)]
34use {
35 futures::channel::oneshot::{self, Receiver},
36 linera_views::{random::generate_test_namespace, store::TestKeyValueDatabase},
37 std::{cmp::Reverse, collections::BTreeMap},
38};
39
40use crate::{ChainRuntimeContext, Clock, Storage};
41
42#[cfg(with_metrics)]
43pub mod metrics {
44 use std::sync::LazyLock;
45
46 use linera_base::prometheus_util::{
47 exponential_bucket_latencies, register_histogram_vec, register_int_counter_vec,
48 };
49 use prometheus::{HistogramVec, IntCounterVec};
50
51 pub(super) static CONTAINS_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
53 register_int_counter_vec(
54 "contains_blob",
55 "The metric counting how often a blob is tested for existence from storage",
56 &[],
57 )
58 });
59
60 pub(super) static CONTAINS_BLOBS_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
62 register_int_counter_vec(
63 "contains_blobs",
64 "The metric counting how often multiple blobs are tested for existence from storage",
65 &[],
66 )
67 });
68
69 pub(super) static CONTAINS_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
71 register_int_counter_vec(
72 "contains_blob_state",
73 "The metric counting how often a blob state is tested for existence from storage",
74 &[],
75 )
76 });
77
78 pub(super) static CONTAINS_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
80 register_int_counter_vec(
81 "contains_certificate",
82 "The metric counting how often a certificate is tested for existence from storage",
83 &[],
84 )
85 });
86
87 #[doc(hidden)]
89 pub static READ_CONFIRMED_BLOCK_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
90 register_int_counter_vec(
91 "read_confirmed_block",
92 "The metric counting how often a hashed confirmed block is read from storage",
93 &[],
94 )
95 });
96
97 #[doc(hidden)]
99 pub(super) static READ_CONFIRMED_BLOCKS_COUNTER: LazyLock<IntCounterVec> =
100 LazyLock::new(|| {
101 register_int_counter_vec(
102 "read_confirmed_blocks",
103 "The metric counting how often confirmed blocks are read from storage",
104 &[],
105 )
106 });
107
108 #[doc(hidden)]
110 pub(super) static READ_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
111 register_int_counter_vec(
112 "read_blob",
113 "The metric counting how often a blob is read from storage",
114 &[],
115 )
116 });
117
118 #[doc(hidden)]
120 pub(super) static READ_BLOB_STATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
121 register_int_counter_vec(
122 "read_blob_state",
123 "The metric counting how often a blob state is read from storage",
124 &[],
125 )
126 });
127
128 #[doc(hidden)]
130 pub(super) static READ_BLOB_STATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
131 register_int_counter_vec(
132 "read_blob_states",
133 "The metric counting how often blob states are read from storage",
134 &[],
135 )
136 });
137
138 #[doc(hidden)]
140 pub(super) static WRITE_BLOB_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
141 register_int_counter_vec(
142 "write_blob",
143 "The metric counting how often a blob is written to storage",
144 &[],
145 )
146 });
147
148 #[doc(hidden)]
150 pub static READ_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
151 register_int_counter_vec(
152 "read_certificate",
153 "The metric counting how often a certificate is read from storage",
154 &[],
155 )
156 });
157
158 #[doc(hidden)]
160 pub(super) static READ_CERTIFICATES_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
161 register_int_counter_vec(
162 "read_certificates",
163 "The metric counting how often certificate are read from storage",
164 &[],
165 )
166 });
167
168 #[doc(hidden)]
170 pub static WRITE_CERTIFICATE_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
171 register_int_counter_vec(
172 "write_certificate",
173 "The metric counting how often a certificate is written to storage",
174 &[],
175 )
176 });
177
178 #[doc(hidden)]
180 pub(crate) static LOAD_CHAIN_LATENCY: LazyLock<HistogramVec> = LazyLock::new(|| {
181 register_histogram_vec(
182 "load_chain_latency",
183 "The latency to load a chain state",
184 &[],
185 exponential_bucket_latencies(1000.0),
186 )
187 });
188
189 #[doc(hidden)]
191 pub(super) static READ_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
192 register_int_counter_vec(
193 "read_event",
194 "The metric counting how often an event is read from storage",
195 &[],
196 )
197 });
198
199 pub(super) static CONTAINS_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
201 register_int_counter_vec(
202 "contains_event",
203 "The metric counting how often an event is tested for existence from storage",
204 &[],
205 )
206 });
207
208 #[doc(hidden)]
210 pub(super) static WRITE_EVENT_COUNTER: LazyLock<IntCounterVec> = LazyLock::new(|| {
211 register_int_counter_vec(
212 "write_event",
213 "The metric counting how often an event is written to storage",
214 &[],
215 )
216 });
217
218 #[doc(hidden)]
220 pub(super) static READ_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
221 register_int_counter_vec(
222 "network_description",
223 "The metric counting how often the network description is read from storage",
224 &[],
225 )
226 });
227
228 #[doc(hidden)]
230 pub(super) static WRITE_NETWORK_DESCRIPTION: LazyLock<IntCounterVec> = LazyLock::new(|| {
231 register_int_counter_vec(
232 "write_network_description",
233 "The metric counting how often the network description is written to storage",
234 &[],
235 )
236 });
237}
238
239pub(crate) const BLOB_KEY: &[u8] = &[42];
241
242pub(crate) const BLOB_STATE_KEY: &[u8] = &[49];
244
245pub(crate) const LITE_CERTIFICATE_KEY: &[u8] = &[91];
247
248pub(crate) const BLOCK_KEY: &[u8] = &[221];
250
251pub(crate) const NETWORK_DESCRIPTION_KEY: &[u8] = &[119];
253
254fn get_block_keys() -> Vec<Vec<u8>> {
255 vec![LITE_CERTIFICATE_KEY.to_vec(), BLOCK_KEY.to_vec()]
256}
257
258#[derive(Default)]
259#[allow(clippy::type_complexity)]
260pub(crate) struct MultiPartitionBatch {
261 keys_value_bytes: Vec<(Vec<u8>, Vec<(Vec<u8>, Vec<u8>)>)>,
262}
263
264impl MultiPartitionBatch {
265 pub(crate) fn new() -> Self {
266 Self::default()
267 }
268
269 pub(crate) fn put_key_values(
270 &mut self,
271 root_key: Vec<u8>,
272 key_values: Vec<(Vec<u8>, Vec<u8>)>,
273 ) {
274 self.keys_value_bytes.push((root_key, key_values));
275 }
276
277 pub(crate) fn put_key_value(&mut self, root_key: Vec<u8>, key: Vec<u8>, value: Vec<u8>) {
278 self.put_key_values(root_key, vec![(key, value)]);
279 }
280
281 fn add_blob(&mut self, blob: &Blob) -> Result<(), ViewError> {
282 #[cfg(with_metrics)]
283 metrics::WRITE_BLOB_COUNTER.with_label_values(&[]).inc();
284 let root_key = RootKey::Blob(blob.id()).bytes();
285 let key = BLOB_KEY.to_vec();
286 self.put_key_value(root_key, key, blob.bytes().to_vec());
287 Ok(())
288 }
289
290 fn add_blob_state(&mut self, blob_id: BlobId, blob_state: &BlobState) -> Result<(), ViewError> {
291 let root_key = RootKey::Blob(blob_id).bytes();
292 let key = BLOB_STATE_KEY.to_vec();
293 let value = bcs::to_bytes(blob_state)?;
294 self.put_key_value(root_key, key, value);
295 Ok(())
296 }
297
298 fn add_certificate(
307 &mut self,
308 certificate: &ConfirmedBlockCertificate,
309 ) -> Result<(), ViewError> {
310 #[cfg(with_metrics)]
311 metrics::WRITE_CERTIFICATE_COUNTER
312 .with_label_values(&[])
313 .inc();
314 let hash = certificate.hash();
315
316 let root_key = RootKey::ConfirmedBlock(hash).bytes();
318 let mut key_values = Vec::new();
319 let key = LITE_CERTIFICATE_KEY.to_vec();
320 let value = bcs::to_bytes(&certificate.lite_certificate())?;
321 key_values.push((key, value));
322 let key = BLOCK_KEY.to_vec();
323 let value = bcs::to_bytes(&certificate.value())?;
324 key_values.push((key, value));
325 self.put_key_values(root_key, key_values);
326
327 let chain_id = certificate.value().block().header.chain_id;
329 let height = certificate.value().block().header.height;
330 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
331 let height_key = to_height_key(height);
332 let index_value = bcs::to_bytes(&hash)?;
333 self.put_key_value(index_root_key, height_key, index_value);
334
335 Ok(())
336 }
337
338 fn add_event(&mut self, event_id: EventId, value: Vec<u8>) -> Result<(), ViewError> {
339 #[cfg(with_metrics)]
340 metrics::WRITE_EVENT_COUNTER.with_label_values(&[]).inc();
341 let key = to_event_key(&event_id);
342 let root_key = RootKey::Event(event_id.chain_id).bytes();
343 self.put_key_value(root_key, key, value);
344 Ok(())
345 }
346
347 fn add_network_description(
348 &mut self,
349 information: &NetworkDescription,
350 ) -> Result<(), ViewError> {
351 #[cfg(with_metrics)]
352 metrics::WRITE_NETWORK_DESCRIPTION
353 .with_label_values(&[])
354 .inc();
355 let root_key = RootKey::NetworkDescription.bytes();
356 let key = NETWORK_DESCRIPTION_KEY.to_vec();
357 let value = bcs::to_bytes(information)?;
358 self.put_key_value(root_key, key, value);
359 Ok(())
360 }
361}
362
363#[derive(Clone)]
365pub struct DbStorage<Database, Clock = WallClock> {
366 pub(crate) database: Arc<Database>,
367 clock: Clock,
368 thread_pool: Arc<linera_execution::ThreadPool>,
369 wasm_runtime: Option<WasmRuntime>,
370 user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
371 user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
372 execution_runtime_config: ExecutionRuntimeConfig,
373}
374
375#[derive(Debug, Serialize, Deserialize)]
376pub(crate) enum RootKey {
377 ChainState(ChainId),
378 ConfirmedBlock(CryptoHash),
379 Blob(BlobId),
380 Event(ChainId),
381 Placeholder,
382 NetworkDescription,
383 BlockExporterState(u32),
384 BlockByHeight(ChainId),
385}
386
387const CHAIN_ID_TAG: u8 = 0;
388const BLOB_ID_TAG: u8 = 2;
389
390impl RootKey {
391 pub(crate) fn bytes(&self) -> Vec<u8> {
392 bcs::to_bytes(self).unwrap()
393 }
394}
395
396#[derive(Debug, Serialize, Deserialize)]
397pub(crate) struct RestrictedEventId {
398 pub stream_id: StreamId,
399 pub index: u32,
400}
401
402pub(crate) fn to_event_key(event_id: &EventId) -> Vec<u8> {
403 let restricted_event_id = RestrictedEventId {
404 stream_id: event_id.stream_id.clone(),
405 index: event_id.index,
406 };
407 bcs::to_bytes(&restricted_event_id).unwrap()
408}
409
410pub(crate) fn to_height_key(height: BlockHeight) -> Vec<u8> {
411 bcs::to_bytes(&height).unwrap()
412}
413
414fn is_chain_state(root_key: &[u8]) -> bool {
415 if root_key.is_empty() {
416 return false;
417 }
418 root_key[0] == CHAIN_ID_TAG
419}
420
421#[cfg(test)]
422mod tests {
423 use linera_base::{
424 crypto::{CryptoHash, TestString},
425 data_types::{BlockHeight, Epoch, Round, Timestamp},
426 identifiers::{
427 ApplicationId, BlobId, BlobType, ChainId, EventId, GenericApplicationId, StreamId,
428 StreamName,
429 },
430 };
431 use linera_chain::{
432 block::{Block, BlockBody, BlockHeader, ConfirmedBlock},
433 types::ConfirmedBlockCertificate,
434 };
435 use linera_views::{
436 memory::MemoryDatabase,
437 store::{KeyValueDatabase, ReadableKeyValueStore as _},
438 };
439
440 use crate::{
441 db_storage::{
442 to_event_key, to_height_key, MultiPartitionBatch, RootKey, BLOB_ID_TAG, CHAIN_ID_TAG,
443 },
444 DbStorage, Storage, TestClock,
445 };
446
447 #[test]
454 fn test_root_key_blob_serialization() {
455 let hash = CryptoHash::default();
456 let blob_type = BlobType::default();
457 let blob_id = BlobId::new(hash, blob_type);
458 let root_key = RootKey::Blob(blob_id).bytes();
459 assert_eq!(root_key[0], BLOB_ID_TAG);
460 assert_eq!(bcs::from_bytes::<BlobId>(&root_key[1..]).unwrap(), blob_id);
461 }
462
463 #[test]
466 fn test_root_key_chainstate_serialization() {
467 let hash = CryptoHash::default();
468 let chain_id = ChainId(hash);
469 let root_key = RootKey::ChainState(chain_id).bytes();
470 assert_eq!(root_key[0], CHAIN_ID_TAG);
471 assert_eq!(
472 bcs::from_bytes::<ChainId>(&root_key[1..]).unwrap(),
473 chain_id
474 );
475 }
476
477 #[test]
480 fn test_root_key_event_serialization() {
481 let hash = CryptoHash::test_hash("49");
482 let chain_id = ChainId(hash);
483 let application_description_hash = CryptoHash::test_hash("42");
484 let application_id = ApplicationId::new(application_description_hash);
485 let application_id = GenericApplicationId::User(application_id);
486 let stream_name = StreamName(bcs::to_bytes("linera_stream").unwrap());
487 let stream_id = StreamId {
488 application_id,
489 stream_name,
490 };
491 let prefix = bcs::to_bytes(&stream_id).unwrap();
492
493 let index = 1567;
494 let event_id = EventId {
495 chain_id,
496 stream_id,
497 index,
498 };
499 let key = to_event_key(&event_id);
500 assert!(key.starts_with(&prefix));
501 }
502
503 #[test]
506 fn test_root_key_block_by_height_serialization() {
507 use linera_base::data_types::BlockHeight;
508
509 let hash = CryptoHash::default();
510 let chain_id = ChainId(hash);
511 let height = BlockHeight(42);
512
513 let root_key = RootKey::BlockByHeight(chain_id).bytes();
515 let deserialized_chain_id: ChainId = bcs::from_bytes(&root_key[1..]).unwrap();
516 assert_eq!(deserialized_chain_id, chain_id);
517
518 let height_key = to_height_key(height);
520 let deserialized_height: BlockHeight = bcs::from_bytes(&height_key).unwrap();
521 assert_eq!(deserialized_height, height);
522 }
523
524 #[cfg(with_testing)]
525 #[tokio::test]
526 async fn test_add_certificate_creates_height_index() {
527 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
529
530 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
532 let height = BlockHeight(5);
533 let block = Block {
534 header: BlockHeader {
535 chain_id,
536 epoch: Epoch::ZERO,
537 height,
538 timestamp: Timestamp::from(0),
539 state_hash: CryptoHash::new(&TestString::new("state_hash")),
540 previous_block_hash: None,
541 authenticated_signer: None,
542 transactions_hash: CryptoHash::new(&TestString::new("transactions_hash")),
543 messages_hash: CryptoHash::new(&TestString::new("messages_hash")),
544 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
545 "prev_msg_blocks_hash",
546 )),
547 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
548 "prev_event_blocks_hash",
549 )),
550 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_responses_hash")),
551 events_hash: CryptoHash::new(&TestString::new("events_hash")),
552 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
553 operation_results_hash: CryptoHash::new(&TestString::new("operation_results_hash")),
554 },
555 body: BlockBody {
556 transactions: vec![],
557 messages: vec![],
558 previous_message_blocks: Default::default(),
559 previous_event_blocks: Default::default(),
560 oracle_responses: vec![],
561 events: vec![],
562 blobs: vec![],
563 operation_results: vec![],
564 },
565 };
566 let confirmed_block = ConfirmedBlock::new(block);
567 let certificate = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
568
569 let mut batch = MultiPartitionBatch::new();
571 batch.add_certificate(&certificate).unwrap();
572 storage.write_batch(batch).await.unwrap();
573
574 let hash = certificate.hash();
576 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
577 let store = storage.database.open_shared(&index_root_key).unwrap();
578 let height_key = to_height_key(height);
579 let value_bytes = store.read_value_bytes(&height_key).await.unwrap();
580
581 assert!(value_bytes.is_some(), "Height index was not created");
582 let stored_hash: CryptoHash = bcs::from_bytes(&value_bytes.unwrap()).unwrap();
583 assert_eq!(stored_hash, hash, "Height index contains wrong hash");
584 }
585
586 #[cfg(with_testing)]
587 #[tokio::test]
588 async fn test_read_certificates_by_heights() {
589 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
590 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
591
592 let mut batch = MultiPartitionBatch::new();
594 let mut expected_certs = vec![];
595
596 for height in [1, 3, 5] {
597 let block = Block {
598 header: BlockHeader {
599 chain_id,
600 epoch: Epoch::ZERO,
601 height: BlockHeight(height),
602 timestamp: Timestamp::from(0),
603 state_hash: CryptoHash::new(&TestString::new("state_hash_{height}")),
604 previous_block_hash: None,
605 authenticated_signer: None,
606 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_{height}")),
607 messages_hash: CryptoHash::new(&TestString::new("msg_hash_{height}")),
608 previous_message_blocks_hash: CryptoHash::new(&TestString::new(
609 "pmb_hash_{height}",
610 )),
611 previous_event_blocks_hash: CryptoHash::new(&TestString::new(
612 "peb_hash_{height}",
613 )),
614 oracle_responses_hash: CryptoHash::new(&TestString::new(
615 "oracle_hash_{height}",
616 )),
617 events_hash: CryptoHash::new(&TestString::new("events_hash_{height}")),
618 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_{height}")),
619 operation_results_hash: CryptoHash::new(&TestString::new(
620 "op_results_hash_{height}",
621 )),
622 },
623 body: BlockBody {
624 transactions: vec![],
625 messages: vec![],
626 previous_message_blocks: Default::default(),
627 previous_event_blocks: Default::default(),
628 oracle_responses: vec![],
629 events: vec![],
630 blobs: vec![],
631 operation_results: vec![],
632 },
633 };
634 let confirmed_block = ConfirmedBlock::new(block);
635 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
636 expected_certs.push((height, cert.clone()));
637 batch.add_certificate(&cert).unwrap();
638 }
639 storage.write_batch(batch).await.unwrap();
640
641 let heights = vec![BlockHeight(1), BlockHeight(3), BlockHeight(5)];
643 let result = storage
644 .read_certificates_by_heights(chain_id, &heights)
645 .await
646 .unwrap();
647 assert_eq!(result.len(), 3);
648 assert_eq!(
649 result[0].as_ref().unwrap().hash(),
650 expected_certs[0].1.hash()
651 );
652 assert_eq!(
653 result[1].as_ref().unwrap().hash(),
654 expected_certs[1].1.hash()
655 );
656 assert_eq!(
657 result[2].as_ref().unwrap().hash(),
658 expected_certs[2].1.hash()
659 );
660
661 let heights = vec![BlockHeight(5), BlockHeight(1), BlockHeight(3)];
663 let result = storage
664 .read_certificates_by_heights(chain_id, &heights)
665 .await
666 .unwrap();
667 assert_eq!(result.len(), 3);
668 assert_eq!(
669 result[0].as_ref().unwrap().hash(),
670 expected_certs[2].1.hash()
671 );
672 assert_eq!(
673 result[1].as_ref().unwrap().hash(),
674 expected_certs[0].1.hash()
675 );
676 assert_eq!(
677 result[2].as_ref().unwrap().hash(),
678 expected_certs[1].1.hash()
679 );
680
681 let heights = vec![
683 BlockHeight(1),
684 BlockHeight(2),
685 BlockHeight(3),
686 BlockHeight(3),
687 ];
688 let result = storage
689 .read_certificates_by_heights(chain_id, &heights)
690 .await
691 .unwrap();
692 assert_eq!(result.len(), 4); assert!(result[0].is_some());
694 assert!(result[1].is_none()); assert!(result[2].is_some());
696 assert_eq!(
697 result[2].as_ref().unwrap().hash(),
698 result[3].as_ref().unwrap().hash()
699 ); let heights = vec![];
703 let result = storage
704 .read_certificates_by_heights(chain_id, &heights)
705 .await
706 .unwrap();
707 assert_eq!(result.len(), 0);
708 }
709
710 #[cfg(with_testing)]
711 #[tokio::test]
712 async fn test_read_certificates_by_heights_multiple_chains() {
713 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
714
715 let chain_a = ChainId(CryptoHash::test_hash("chain_a"));
717 let chain_b = ChainId(CryptoHash::test_hash("chain_b"));
718
719 let mut batch = MultiPartitionBatch::new();
720
721 let block_a = Block {
722 header: BlockHeader {
723 chain_id: chain_a,
724 epoch: Epoch::ZERO,
725 height: BlockHeight(10),
726 timestamp: Timestamp::from(0),
727 state_hash: CryptoHash::new(&TestString::new("state_hash_a")),
728 previous_block_hash: None,
729 authenticated_signer: None,
730 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_a")),
731 messages_hash: CryptoHash::new(&TestString::new("msg_hash_a")),
732 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_a")),
733 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_a")),
734 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_a")),
735 events_hash: CryptoHash::new(&TestString::new("events_hash_a")),
736 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_a")),
737 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_a")),
738 },
739 body: BlockBody {
740 transactions: vec![],
741 messages: vec![],
742 previous_message_blocks: Default::default(),
743 previous_event_blocks: Default::default(),
744 oracle_responses: vec![],
745 events: vec![],
746 blobs: vec![],
747 operation_results: vec![],
748 },
749 };
750 let confirmed_block_a = ConfirmedBlock::new(block_a);
751 let cert_a = ConfirmedBlockCertificate::new(confirmed_block_a, Round::Fast, vec![]);
752 batch.add_certificate(&cert_a).unwrap();
753
754 let block_b = Block {
755 header: BlockHeader {
756 chain_id: chain_b,
757 epoch: Epoch::ZERO,
758 height: BlockHeight(10),
759 timestamp: Timestamp::from(0),
760 state_hash: CryptoHash::new(&TestString::new("state_hash_b")),
761 previous_block_hash: None,
762 authenticated_signer: None,
763 transactions_hash: CryptoHash::new(&TestString::new("tx_hash_b")),
764 messages_hash: CryptoHash::new(&TestString::new("msg_hash_b")),
765 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash_b")),
766 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash_b")),
767 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash_b")),
768 events_hash: CryptoHash::new(&TestString::new("events_hash_b")),
769 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash_b")),
770 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash_b")),
771 },
772 body: BlockBody {
773 transactions: vec![],
774 messages: vec![],
775 previous_message_blocks: Default::default(),
776 previous_event_blocks: Default::default(),
777 oracle_responses: vec![],
778 events: vec![],
779 blobs: vec![],
780 operation_results: vec![],
781 },
782 };
783 let confirmed_block_b = ConfirmedBlock::new(block_b);
784 let cert_b = ConfirmedBlockCertificate::new(confirmed_block_b, Round::Fast, vec![]);
785 batch.add_certificate(&cert_b).unwrap();
786
787 storage.write_batch(batch).await.unwrap();
788
789 let result = storage
791 .read_certificates_by_heights(chain_a, &[BlockHeight(10)])
792 .await
793 .unwrap();
794 assert_eq!(result[0].as_ref().unwrap().hash(), cert_a.hash());
795
796 let result = storage
798 .read_certificates_by_heights(chain_b, &[BlockHeight(10)])
799 .await
800 .unwrap();
801 assert_eq!(result[0].as_ref().unwrap().hash(), cert_b.hash());
802
803 let result = storage
805 .read_certificates_by_heights(chain_a, &[BlockHeight(20)])
806 .await
807 .unwrap();
808 assert!(result[0].is_none());
809 }
810
811 #[cfg(with_testing)]
812 #[tokio::test]
813 async fn test_read_certificates_by_heights_consistency() {
814 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
815 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
816
817 let mut batch = MultiPartitionBatch::new();
819 let block = Block {
820 header: BlockHeader {
821 chain_id,
822 epoch: Epoch::ZERO,
823 height: BlockHeight(7),
824 timestamp: Timestamp::from(0),
825 state_hash: CryptoHash::new(&TestString::new("state_hash")),
826 previous_block_hash: None,
827 authenticated_signer: None,
828 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
829 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
830 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
831 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
832 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
833 events_hash: CryptoHash::new(&TestString::new("events_hash")),
834 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
835 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
836 },
837 body: BlockBody {
838 transactions: vec![],
839 messages: vec![],
840 previous_message_blocks: Default::default(),
841 previous_event_blocks: Default::default(),
842 oracle_responses: vec![],
843 events: vec![],
844 blobs: vec![],
845 operation_results: vec![],
846 },
847 };
848 let confirmed_block = ConfirmedBlock::new(block);
849 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
850 let hash = cert.hash();
851 batch.add_certificate(&cert).unwrap();
852 storage.write_batch(batch).await.unwrap();
853
854 let cert_by_hash = storage.read_certificate(hash).await.unwrap().unwrap();
856
857 let certs_by_height = storage
859 .read_certificates_by_heights(chain_id, &[BlockHeight(7)])
860 .await
861 .unwrap();
862 let cert_by_height = certs_by_height[0].as_ref().unwrap();
863
864 assert_eq!(cert_by_hash.hash(), cert_by_height.hash());
866 assert_eq!(
867 cert_by_hash.value().block().header,
868 cert_by_height.value().block().header
869 );
870 }
871
872 #[cfg(with_testing)]
876 #[tokio::test]
877 async fn test_write_certificate_height_indices_populates_reverse_index() {
878 use linera_views::{batch::Batch, store::WritableKeyValueStore as _};
879
880 let storage = DbStorage::<MemoryDatabase, TestClock>::make_test_storage(None).await;
881 let chain_id = ChainId(CryptoHash::test_hash("test_chain"));
882
883 let block = Block {
886 header: BlockHeader {
887 chain_id,
888 epoch: Epoch::ZERO,
889 height: BlockHeight(10),
890 timestamp: Timestamp::from(0),
891 state_hash: CryptoHash::new(&TestString::new("state_hash")),
892 previous_block_hash: None,
893 authenticated_signer: None,
894 transactions_hash: CryptoHash::new(&TestString::new("tx_hash")),
895 messages_hash: CryptoHash::new(&TestString::new("msg_hash")),
896 previous_message_blocks_hash: CryptoHash::new(&TestString::new("pmb_hash")),
897 previous_event_blocks_hash: CryptoHash::new(&TestString::new("peb_hash")),
898 oracle_responses_hash: CryptoHash::new(&TestString::new("oracle_hash")),
899 events_hash: CryptoHash::new(&TestString::new("events_hash")),
900 blobs_hash: CryptoHash::new(&TestString::new("blobs_hash")),
901 operation_results_hash: CryptoHash::new(&TestString::new("op_results_hash")),
902 },
903 body: BlockBody {
904 transactions: vec![],
905 messages: vec![],
906 previous_message_blocks: Default::default(),
907 previous_event_blocks: Default::default(),
908 oracle_responses: vec![],
909 events: vec![],
910 blobs: vec![],
911 operation_results: vec![],
912 },
913 };
914 let confirmed_block = ConfirmedBlock::new(block);
915 let cert = ConfirmedBlockCertificate::new(confirmed_block, Round::Fast, vec![]);
916 let hash = cert.hash();
917 let height = BlockHeight(10);
918
919 let root_key = RootKey::ConfirmedBlock(hash).bytes();
921 let store = storage.database.open_shared(&root_key).unwrap();
922 let mut batch = Batch::new();
923 batch.put_key_value_bytes(
924 crate::db_storage::LITE_CERTIFICATE_KEY.to_vec(),
925 bcs::to_bytes(&cert.lite_certificate()).unwrap(),
926 );
927 batch.put_key_value_bytes(
928 crate::db_storage::BLOCK_KEY.to_vec(),
929 bcs::to_bytes(&cert.value()).unwrap(),
930 );
931 store.write_batch(batch).await.unwrap();
932
933 let result = storage
935 .read_certificates_by_heights(chain_id, &[height])
936 .await
937 .unwrap();
938 assert!(
939 result[0].is_none(),
940 "Height index should not exist before write_certificate_height_indices"
941 );
942
943 let cert_by_hash = storage.read_certificate(hash).await.unwrap();
945 assert!(cert_by_hash.is_some(), "Certificate should exist by hash");
946
947 storage
949 .write_certificate_height_indices(chain_id, &[(height, hash)])
950 .await
951 .unwrap();
952
953 let result = storage
955 .read_certificates_by_heights(chain_id, &[height])
956 .await
957 .unwrap();
958 assert!(
959 result[0].is_some(),
960 "Height index should exist after write_certificate_height_indices"
961 );
962 assert_eq!(
963 result[0].as_ref().unwrap().hash(),
964 hash,
965 "Certificate retrieved by height should match original"
966 );
967 }
968}
969
970#[derive(Clone, Copy)]
973pub struct ChainStatesFirstAssignment;
974
975impl DualStoreRootKeyAssignment for ChainStatesFirstAssignment {
976 fn assigned_store(root_key: &[u8]) -> Result<StoreInUse, bcs::Error> {
977 if root_key.is_empty() {
978 return Ok(StoreInUse::Second);
979 }
980 let store = match is_chain_state(root_key) {
981 true => StoreInUse::First,
982 false => StoreInUse::Second,
983 };
984 Ok(store)
985 }
986}
987
988#[derive(Clone)]
990pub struct WallClock;
991
992#[cfg_attr(not(web), async_trait)]
993#[cfg_attr(web, async_trait(?Send))]
994impl Clock for WallClock {
995 fn current_time(&self) -> Timestamp {
996 Timestamp::now()
997 }
998
999 async fn sleep(&self, delta: TimeDelta) {
1000 linera_base::time::timer::sleep(delta.as_duration()).await
1001 }
1002
1003 async fn sleep_until(&self, timestamp: Timestamp) {
1004 let delta = timestamp.delta_since(Timestamp::now());
1005 if delta > TimeDelta::ZERO {
1006 self.sleep(delta).await
1007 }
1008 }
1009}
1010
1011#[cfg(with_testing)]
1012#[derive(Default)]
1013struct TestClockInner {
1014 time: Timestamp,
1015 sleeps: BTreeMap<Reverse<Timestamp>, Vec<oneshot::Sender<()>>>,
1016 sleep_callback: Option<Box<dyn Fn(Timestamp) -> bool + Send + Sync>>,
1019}
1020
1021#[cfg(with_testing)]
1022impl TestClockInner {
1023 fn set(&mut self, time: Timestamp) {
1024 self.time = time;
1025 let senders = self.sleeps.split_off(&Reverse(time));
1026 for sender in senders.into_values().flatten() {
1027 sender.send(()).ok();
1029 }
1030 }
1031
1032 fn add_sleep(&mut self, delta: TimeDelta) -> Receiver<()> {
1033 let target_time = self.time.saturating_add(delta);
1034 self.add_sleep_until(target_time)
1035 }
1036
1037 fn add_sleep_until(&mut self, time: Timestamp) -> Receiver<()> {
1038 let (sender, receiver) = oneshot::channel();
1039 let should_auto_advance = self
1040 .sleep_callback
1041 .as_ref()
1042 .is_some_and(|callback| callback(time));
1043 if should_auto_advance && time > self.time {
1044 self.set(time);
1046 sender.send(()).ok();
1048 } else if self.time >= time {
1049 sender.send(()).ok();
1051 } else {
1052 self.sleeps.entry(Reverse(time)).or_default().push(sender);
1053 }
1054 receiver
1055 }
1056}
1057
1058#[cfg(with_testing)]
1061#[derive(Clone, Default)]
1062pub struct TestClock(Arc<std::sync::Mutex<TestClockInner>>);
1063
1064#[cfg(with_testing)]
1065#[cfg_attr(not(web), async_trait)]
1066#[cfg_attr(web, async_trait(?Send))]
1067impl Clock for TestClock {
1068 fn current_time(&self) -> Timestamp {
1069 self.lock().time
1070 }
1071
1072 async fn sleep(&self, delta: TimeDelta) {
1073 if delta == TimeDelta::ZERO {
1074 return;
1075 }
1076 let receiver = self.lock().add_sleep(delta);
1077 receiver.await.ok();
1079 }
1080
1081 async fn sleep_until(&self, timestamp: Timestamp) {
1082 let receiver = self.lock().add_sleep_until(timestamp);
1083 receiver.await.ok();
1085 }
1086}
1087
1088#[cfg(with_testing)]
1089impl TestClock {
1090 pub fn new() -> Self {
1092 TestClock(Arc::default())
1093 }
1094
1095 pub fn set(&self, time: Timestamp) {
1097 self.lock().set(time);
1098 }
1099
1100 pub fn add(&self, delta: TimeDelta) {
1102 let mut guard = self.lock();
1103 let time = guard.time.saturating_add(delta);
1104 guard.set(time);
1105 }
1106
1107 pub fn current_time(&self) -> Timestamp {
1109 self.lock().time
1110 }
1111
1112 pub fn set_sleep_callback<F>(&self, callback: F)
1117 where
1118 F: Fn(Timestamp) -> bool + Send + Sync + 'static,
1119 {
1120 self.lock().sleep_callback = Some(Box::new(callback));
1121 }
1122
1123 pub fn clear_sleep_callback(&self) {
1125 self.lock().sleep_callback = None;
1126 }
1127
1128 fn lock(&self) -> std::sync::MutexGuard<TestClockInner> {
1129 self.0.lock().expect("poisoned TestClock mutex")
1130 }
1131}
1132
1133#[cfg_attr(not(web), async_trait)]
1134#[cfg_attr(web, async_trait(?Send))]
1135impl<Database, C> Storage for DbStorage<Database, C>
1136where
1137 Database: KeyValueDatabase<
1138 Store: KeyValueStore + Clone + linera_base::util::traits::AutoTraits + 'static,
1139 Error: Send + Sync,
1140 > + Clone
1141 + linera_base::util::traits::AutoTraits
1142 + 'static,
1143 C: Clock + Clone + Send + Sync + 'static,
1144{
1145 type Context = ViewContext<ChainRuntimeContext<Self>, Database::Store>;
1146 type Clock = C;
1147 type BlockExporterContext = ViewContext<u32, Database::Store>;
1148
1149 fn clock(&self) -> &C {
1150 &self.clock
1151 }
1152
1153 fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
1154 &self.thread_pool
1155 }
1156
1157 #[instrument(level = "trace", skip_all, fields(chain_id = %chain_id))]
1158 async fn load_chain(
1159 &self,
1160 chain_id: ChainId,
1161 ) -> Result<ChainStateView<Self::Context>, ViewError> {
1162 #[cfg(with_metrics)]
1163 let _metric = metrics::LOAD_CHAIN_LATENCY.measure_latency();
1164 let runtime_context = ChainRuntimeContext {
1165 storage: self.clone(),
1166 thread_pool: self.thread_pool.clone(),
1167 chain_id,
1168 execution_runtime_config: self.execution_runtime_config,
1169 user_contracts: self.user_contracts.clone(),
1170 user_services: self.user_services.clone(),
1171 };
1172 let root_key = RootKey::ChainState(chain_id).bytes();
1173 let store = self.database.open_exclusive(&root_key)?;
1174 let context = ViewContext::create_root_context(store, runtime_context).await?;
1175 ChainStateView::load(context).await
1176 }
1177
1178 #[instrument(level = "trace", skip_all, fields(%blob_id))]
1179 async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1180 let root_key = RootKey::Blob(blob_id).bytes();
1181 let store = self.database.open_shared(&root_key)?;
1182 let test = store.contains_key(BLOB_KEY).await?;
1183 #[cfg(with_metrics)]
1184 metrics::CONTAINS_BLOB_COUNTER.with_label_values(&[]).inc();
1185 Ok(test)
1186 }
1187
1188 #[instrument(skip_all, fields(blob_count = blob_ids.len()))]
1189 async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError> {
1190 let mut missing_blobs = Vec::new();
1191 for blob_id in blob_ids {
1192 let root_key = RootKey::Blob(*blob_id).bytes();
1193 let store = self.database.open_shared(&root_key)?;
1194 if !store.contains_key(BLOB_KEY).await? {
1195 missing_blobs.push(*blob_id);
1196 }
1197 }
1198 #[cfg(with_metrics)]
1199 metrics::CONTAINS_BLOBS_COUNTER.with_label_values(&[]).inc();
1200 Ok(missing_blobs)
1201 }
1202
1203 #[instrument(skip_all, fields(%blob_id))]
1204 async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError> {
1205 let root_key = RootKey::Blob(blob_id).bytes();
1206 let store = self.database.open_shared(&root_key)?;
1207 let test = store.contains_key(BLOB_STATE_KEY).await?;
1208 #[cfg(with_metrics)]
1209 metrics::CONTAINS_BLOB_STATE_COUNTER
1210 .with_label_values(&[])
1211 .inc();
1212 Ok(test)
1213 }
1214
1215 #[instrument(skip_all, fields(%hash))]
1216 async fn read_confirmed_block(
1217 &self,
1218 hash: CryptoHash,
1219 ) -> Result<Option<ConfirmedBlock>, ViewError> {
1220 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1221 let store = self.database.open_shared(&root_key)?;
1222 let value = store.read_value(BLOCK_KEY).await?;
1223 #[cfg(with_metrics)]
1224 metrics::READ_CONFIRMED_BLOCK_COUNTER
1225 .with_label_values(&[])
1226 .inc();
1227 Ok(value)
1228 }
1229
1230 #[instrument(skip_all)]
1231 async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
1232 &self,
1233 hashes: I,
1234 ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError> {
1235 let hashes = hashes.into_iter().collect::<Vec<_>>();
1236 if hashes.is_empty() {
1237 return Ok(Vec::new());
1238 }
1239 let root_keys = Self::get_root_keys_for_certificates(&hashes);
1240 let mut blocks = Vec::new();
1241 for root_key in root_keys {
1242 let store = self.database.open_shared(&root_key)?;
1243 blocks.push(store.read_value(BLOCK_KEY).await?);
1244 }
1245 #[cfg(with_metrics)]
1246 metrics::READ_CONFIRMED_BLOCKS_COUNTER
1247 .with_label_values(&[])
1248 .inc_by(hashes.len() as u64);
1249 Ok(blocks)
1250 }
1251
1252 #[instrument(skip_all, fields(%blob_id))]
1253 async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
1254 let root_key = RootKey::Blob(blob_id).bytes();
1255 let store = self.database.open_shared(&root_key)?;
1256 let maybe_blob_bytes = store.read_value_bytes(BLOB_KEY).await?;
1257 #[cfg(with_metrics)]
1258 metrics::READ_BLOB_COUNTER.with_label_values(&[]).inc();
1259 Ok(maybe_blob_bytes.map(|blob_bytes| Blob::new_with_id_unchecked(blob_id, blob_bytes)))
1260 }
1261
1262 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1263 async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError> {
1264 if blob_ids.is_empty() {
1265 return Ok(Vec::new());
1266 }
1267 let mut blobs = Vec::new();
1268 for blob_id in blob_ids {
1269 blobs.push(self.read_blob(*blob_id).await?);
1270 }
1271 #[cfg(with_metrics)]
1272 metrics::READ_BLOB_COUNTER
1273 .with_label_values(&[])
1274 .inc_by(blob_ids.len() as u64);
1275 Ok(blobs)
1276 }
1277
1278 #[instrument(skip_all, fields(%blob_id))]
1279 async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError> {
1280 let root_key = RootKey::Blob(blob_id).bytes();
1281 let store = self.database.open_shared(&root_key)?;
1282 let blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1283 #[cfg(with_metrics)]
1284 metrics::READ_BLOB_STATE_COUNTER
1285 .with_label_values(&[])
1286 .inc();
1287 Ok(blob_state)
1288 }
1289
1290 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1291 async fn read_blob_states(
1292 &self,
1293 blob_ids: &[BlobId],
1294 ) -> Result<Vec<Option<BlobState>>, ViewError> {
1295 if blob_ids.is_empty() {
1296 return Ok(Vec::new());
1297 }
1298 let mut blob_states = Vec::new();
1299 for blob_id in blob_ids {
1300 blob_states.push(self.read_blob_state(*blob_id).await?);
1301 }
1302 #[cfg(with_metrics)]
1303 metrics::READ_BLOB_STATES_COUNTER
1304 .with_label_values(&[])
1305 .inc_by(blob_ids.len() as u64);
1306 Ok(blob_states)
1307 }
1308
1309 #[instrument(skip_all, fields(blob_id = %blob.id()))]
1310 async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> {
1311 let mut batch = MultiPartitionBatch::new();
1312 batch.add_blob(blob)?;
1313 self.write_batch(batch).await?;
1314 Ok(())
1315 }
1316
1317 #[instrument(skip_all, fields(blob_ids_len = %blob_ids.len()))]
1318 async fn maybe_write_blob_states(
1319 &self,
1320 blob_ids: &[BlobId],
1321 blob_state: BlobState,
1322 ) -> Result<(), ViewError> {
1323 if blob_ids.is_empty() {
1324 return Ok(());
1325 }
1326 let mut maybe_blob_states = Vec::new();
1327 for blob_id in blob_ids {
1328 let root_key = RootKey::Blob(*blob_id).bytes();
1329 let store = self.database.open_shared(&root_key)?;
1330 let maybe_blob_state = store.read_value::<BlobState>(BLOB_STATE_KEY).await?;
1331 maybe_blob_states.push(maybe_blob_state);
1332 }
1333 let mut batch = MultiPartitionBatch::new();
1334 for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
1335 match maybe_blob_state {
1336 None => {
1337 batch.add_blob_state(*blob_id, &blob_state)?;
1338 }
1339 Some(state) => {
1340 if state.epoch < blob_state.epoch {
1341 batch.add_blob_state(*blob_id, &blob_state)?;
1342 }
1343 }
1344 }
1345 }
1346 self.write_batch(batch).await?;
1350 Ok(())
1351 }
1352
1353 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1354 async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError> {
1355 if blobs.is_empty() {
1356 return Ok(Vec::new());
1357 }
1358 let mut batch = MultiPartitionBatch::new();
1359 let mut blob_states = Vec::new();
1360 for blob in blobs {
1361 let root_key = RootKey::Blob(blob.id()).bytes();
1362 let store = self.database.open_shared(&root_key)?;
1363 let has_state = store.contains_key(BLOB_STATE_KEY).await?;
1364 blob_states.push(has_state);
1365 if has_state {
1366 batch.add_blob(blob)?;
1367 }
1368 }
1369 self.write_batch(batch).await?;
1370 Ok(blob_states)
1371 }
1372
1373 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1374 async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError> {
1375 if blobs.is_empty() {
1376 return Ok(());
1377 }
1378 let mut batch = MultiPartitionBatch::new();
1379 for blob in blobs {
1380 batch.add_blob(blob)?;
1381 }
1382 self.write_batch(batch).await
1383 }
1384
1385 #[instrument(skip_all, fields(blobs_len = %blobs.len()))]
1386 async fn write_blobs_and_certificate(
1387 &self,
1388 blobs: &[Blob],
1389 certificate: &ConfirmedBlockCertificate,
1390 ) -> Result<(), ViewError> {
1391 let mut batch = MultiPartitionBatch::new();
1392 for blob in blobs {
1393 batch.add_blob(blob)?;
1394 }
1395 batch.add_certificate(certificate)?;
1396 self.write_batch(batch).await
1397 }
1398
1399 #[instrument(skip_all, fields(%hash))]
1400 async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError> {
1401 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1402 let store = self.database.open_shared(&root_key)?;
1403 let results = store.contains_keys(&get_block_keys()).await?;
1404 #[cfg(with_metrics)]
1405 metrics::CONTAINS_CERTIFICATE_COUNTER
1406 .with_label_values(&[])
1407 .inc();
1408 Ok(results[0] && results[1])
1409 }
1410
1411 #[instrument(skip_all, fields(%hash))]
1412 async fn read_certificate(
1413 &self,
1414 hash: CryptoHash,
1415 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1416 let root_key = RootKey::ConfirmedBlock(hash).bytes();
1417 let store = self.database.open_shared(&root_key)?;
1418 let values = store.read_multi_values_bytes(&get_block_keys()).await?;
1419 #[cfg(with_metrics)]
1420 metrics::READ_CERTIFICATE_COUNTER
1421 .with_label_values(&[])
1422 .inc();
1423 Self::deserialize_certificate(&values, hash)
1424 }
1425
1426 #[instrument(skip_all)]
1427 async fn read_certificates(
1428 &self,
1429 hashes: &[CryptoHash],
1430 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1431 let raw_certs = self.read_certificates_raw(hashes).await?;
1432
1433 raw_certs
1434 .into_iter()
1435 .zip(hashes)
1436 .map(|(maybe_raw, hash)| {
1437 let Some((lite_cert_bytes, confirmed_block_bytes)) = maybe_raw else {
1438 return Ok(None);
1439 };
1440 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1441 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1442 assert_eq!(&value.hash(), hash);
1443 let certificate = cert
1444 .with_value(value)
1445 .ok_or(ViewError::InconsistentEntries)?;
1446 Ok(Some(certificate))
1447 })
1448 .collect()
1449 }
1450
1451 #[instrument(skip_all)]
1452 async fn read_certificates_raw(
1453 &self,
1454 hashes: &[CryptoHash],
1455 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
1456 if hashes.is_empty() {
1457 return Ok(Vec::new());
1458 }
1459 let root_keys = Self::get_root_keys_for_certificates(hashes);
1460 let mut values = Vec::new();
1461 for root_key in root_keys {
1462 let store = self.database.open_shared(&root_key)?;
1463 values.extend(store.read_multi_values_bytes(&get_block_keys()).await?);
1464 }
1465 #[cfg(with_metrics)]
1466 metrics::READ_CERTIFICATES_COUNTER
1467 .with_label_values(&[])
1468 .inc_by(hashes.len() as u64);
1469 Ok(values
1470 .chunks_exact(2)
1471 .map(|chunk| {
1472 let lite_cert_bytes = chunk[0].as_ref()?;
1473 let confirmed_block_bytes = chunk[1].as_ref()?;
1474 Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
1475 })
1476 .collect())
1477 }
1478
1479 async fn read_certificate_hashes_by_heights(
1480 &self,
1481 chain_id: ChainId,
1482 heights: &[BlockHeight],
1483 ) -> Result<Vec<Option<CryptoHash>>, ViewError> {
1484 if heights.is_empty() {
1485 return Ok(Vec::new());
1486 }
1487
1488 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1489 let store = self.database.open_shared(&index_root_key)?;
1490 let height_keys: Vec<Vec<u8>> = heights.iter().map(|h| to_height_key(*h)).collect();
1491 let hash_bytes = store.read_multi_values_bytes(&height_keys).await?;
1492 let hash_options: Vec<Option<CryptoHash>> = hash_bytes
1493 .into_iter()
1494 .map(|opt| {
1495 opt.map(|bytes| bcs::from_bytes::<CryptoHash>(&bytes))
1496 .transpose()
1497 })
1498 .collect::<Result<_, _>>()?;
1499
1500 Ok(hash_options)
1501 }
1502
1503 #[instrument(skip_all)]
1504 async fn read_certificates_by_heights_raw(
1505 &self,
1506 chain_id: ChainId,
1507 heights: &[BlockHeight],
1508 ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError> {
1509 let hashes: Vec<Option<CryptoHash>> = self
1510 .read_certificate_hashes_by_heights(chain_id, heights)
1511 .await?;
1512
1513 let mut indices: HashMap<CryptoHash, Vec<usize>> = HashMap::new();
1515 for (index, maybe_hash) in hashes.iter().enumerate() {
1516 if let Some(hash) = maybe_hash {
1517 indices.entry(*hash).or_default().push(index);
1518 }
1519 }
1520
1521 let unique_hashes = indices.keys().copied().collect::<Vec<_>>();
1523
1524 let mut result = vec![None; heights.len()];
1525
1526 for (raw_cert, hash) in self
1527 .read_certificates_raw(&unique_hashes)
1528 .await?
1529 .into_iter()
1530 .zip(unique_hashes)
1531 {
1532 if let Some(idx_list) = indices.get(&hash) {
1533 for &index in idx_list {
1534 result[index] = raw_cert.clone();
1535 }
1536 } else {
1537 tracing::warn!(
1539 hash=?hash,
1540 "certificate hash not found in indices map",
1541 );
1542 }
1543 }
1544
1545 Ok(result)
1546 }
1547
1548 #[instrument(skip_all, fields(%chain_id, heights_len = heights.len()))]
1549 async fn read_certificates_by_heights(
1550 &self,
1551 chain_id: ChainId,
1552 heights: &[BlockHeight],
1553 ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError> {
1554 self.read_certificates_by_heights_raw(chain_id, heights)
1555 .await?
1556 .into_iter()
1557 .map(|maybe_raw| match maybe_raw {
1558 None => Ok(None),
1559 Some((lite_cert_bytes, confirmed_block_bytes)) => {
1560 let cert = bcs::from_bytes::<LiteCertificate>(&lite_cert_bytes)?;
1561 let value = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block_bytes)?;
1562 let certificate = cert
1563 .with_value(value)
1564 .ok_or(ViewError::InconsistentEntries)?;
1565 Ok(Some(certificate))
1566 }
1567 })
1568 .collect()
1569 }
1570
1571 #[instrument(skip_all, fields(%chain_id, indices_len = indices.len()))]
1572 async fn write_certificate_height_indices(
1573 &self,
1574 chain_id: ChainId,
1575 indices: &[(BlockHeight, CryptoHash)],
1576 ) -> Result<(), ViewError> {
1577 if indices.is_empty() {
1578 return Ok(());
1579 }
1580
1581 let mut batch = MultiPartitionBatch::new();
1582 let index_root_key = RootKey::BlockByHeight(chain_id).bytes();
1583 let key_values: Vec<(Vec<u8>, Vec<u8>)> = indices
1584 .iter()
1585 .map(|(height, hash)| {
1586 let height_key = to_height_key(*height);
1587 let hash_value = bcs::to_bytes(hash).unwrap();
1588 (height_key, hash_value)
1589 })
1590 .collect();
1591 batch.put_key_values(index_root_key, key_values);
1592 self.write_batch(batch).await
1593 }
1594
1595 #[instrument(skip_all, fields(event_id = ?event_id))]
1596 async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
1597 let event_key = to_event_key(&event_id);
1598 let root_key = RootKey::Event(event_id.chain_id).bytes();
1599 let store = self.database.open_shared(&root_key)?;
1600 let event = store.read_value_bytes(&event_key).await?;
1601 #[cfg(with_metrics)]
1602 metrics::READ_EVENT_COUNTER.with_label_values(&[]).inc();
1603 Ok(event)
1604 }
1605
1606 #[instrument(skip_all, fields(event_id = ?event_id))]
1607 async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
1608 let event_key = to_event_key(&event_id);
1609 let root_key = RootKey::Event(event_id.chain_id).bytes();
1610 let store = self.database.open_shared(&root_key)?;
1611 let exists = store.contains_key(&event_key).await?;
1612 #[cfg(with_metrics)]
1613 metrics::CONTAINS_EVENT_COUNTER.with_label_values(&[]).inc();
1614 Ok(exists)
1615 }
1616
1617 #[instrument(skip_all, fields(%chain_id, %stream_id, %start_index))]
1618 async fn read_events_from_index(
1619 &self,
1620 chain_id: &ChainId,
1621 stream_id: &StreamId,
1622 start_index: u32,
1623 ) -> Result<Vec<IndexAndEvent>, ViewError> {
1624 let root_key = RootKey::Event(*chain_id).bytes();
1625 let store = self.database.open_shared(&root_key)?;
1626 let mut keys = Vec::new();
1627 let mut indices = Vec::new();
1628 let prefix = bcs::to_bytes(stream_id).unwrap();
1629 for short_key in store.find_keys_by_prefix(&prefix).await? {
1630 let index = bcs::from_bytes::<u32>(&short_key)?;
1631 if index >= start_index {
1632 let mut key = prefix.clone();
1633 key.extend(short_key);
1634 keys.push(key);
1635 indices.push(index);
1636 }
1637 }
1638 let values = store.read_multi_values_bytes(&keys).await?;
1639 let mut returned_values = Vec::new();
1640 for (index, value) in indices.into_iter().zip(values) {
1641 let event = value.unwrap();
1642 returned_values.push(IndexAndEvent { index, event });
1643 }
1644 Ok(returned_values)
1645 }
1646
1647 #[instrument(skip_all)]
1648 async fn write_events(
1649 &self,
1650 events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
1651 ) -> Result<(), ViewError> {
1652 let mut batch = MultiPartitionBatch::new();
1653 for (event_id, value) in events {
1654 batch.add_event(event_id, value)?;
1655 }
1656 self.write_batch(batch).await
1657 }
1658
1659 #[instrument(skip_all)]
1660 async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
1661 let root_key = RootKey::NetworkDescription.bytes();
1662 let store = self.database.open_shared(&root_key)?;
1663 let maybe_value = store.read_value(NETWORK_DESCRIPTION_KEY).await?;
1664 #[cfg(with_metrics)]
1665 metrics::READ_NETWORK_DESCRIPTION
1666 .with_label_values(&[])
1667 .inc();
1668 Ok(maybe_value)
1669 }
1670
1671 #[instrument(skip_all)]
1672 async fn write_network_description(
1673 &self,
1674 information: &NetworkDescription,
1675 ) -> Result<(), ViewError> {
1676 let mut batch = MultiPartitionBatch::new();
1677 batch.add_network_description(information)?;
1678 self.write_batch(batch).await?;
1679 Ok(())
1680 }
1681
1682 fn wasm_runtime(&self) -> Option<WasmRuntime> {
1683 self.wasm_runtime
1684 }
1685
1686 #[instrument(skip_all)]
1687 async fn block_exporter_context(
1688 &self,
1689 block_exporter_id: u32,
1690 ) -> Result<Self::BlockExporterContext, ViewError> {
1691 let root_key = RootKey::BlockExporterState(block_exporter_id).bytes();
1692 let store = self.database.open_exclusive(&root_key)?;
1693 Ok(ViewContext::create_root_context(store, block_exporter_id).await?)
1694 }
1695}
1696
1697impl<Database, C> DbStorage<Database, C>
1698where
1699 Database: KeyValueDatabase + Clone,
1700 Database::Store: KeyValueStore + Clone,
1701 C: Clock,
1702 Database::Error: Send + Sync,
1703{
1704 #[instrument(skip_all)]
1705 fn get_root_keys_for_certificates(hashes: &[CryptoHash]) -> Vec<Vec<u8>> {
1706 hashes
1707 .iter()
1708 .map(|hash| RootKey::ConfirmedBlock(*hash).bytes())
1709 .collect()
1710 }
1711
1712 #[instrument(skip_all)]
1713 fn deserialize_certificate(
1714 pair: &[Option<Vec<u8>>],
1715 hash: CryptoHash,
1716 ) -> Result<Option<ConfirmedBlockCertificate>, ViewError> {
1717 let Some(cert_bytes) = pair[0].as_ref() else {
1718 return Ok(None);
1719 };
1720 let Some(value_bytes) = pair[1].as_ref() else {
1721 return Ok(None);
1722 };
1723 let cert = bcs::from_bytes::<LiteCertificate>(cert_bytes)?;
1724 let value = bcs::from_bytes::<ConfirmedBlock>(value_bytes)?;
1725 assert_eq!(value.hash(), hash);
1726 let certificate = cert
1727 .with_value(value)
1728 .ok_or(ViewError::InconsistentEntries)?;
1729 Ok(Some(certificate))
1730 }
1731
1732 #[instrument(skip_all)]
1733 async fn write_entry(
1734 store: &Database::Store,
1735 key_values: Vec<(Vec<u8>, Vec<u8>)>,
1736 ) -> Result<(), ViewError> {
1737 let mut batch = Batch::new();
1738 for (key, value) in key_values {
1739 batch.put_key_value_bytes(key, value);
1740 }
1741 store.write_batch(batch).await?;
1742 Ok(())
1743 }
1744
1745 #[instrument(skip_all, fields(batch_size = batch.keys_value_bytes.len()))]
1746 pub(crate) async fn write_batch(&self, batch: MultiPartitionBatch) -> Result<(), ViewError> {
1747 if batch.keys_value_bytes.is_empty() {
1748 return Ok(());
1749 }
1750 let mut futures = Vec::new();
1751 for (root_key, key_values) in batch.keys_value_bytes {
1752 let store = self.database.open_shared(&root_key)?;
1753 futures.push(async move { Self::write_entry(&store, key_values).await });
1754 }
1755 futures::future::try_join_all(futures).await?;
1756 Ok(())
1757 }
1758}
1759
1760impl<Database, C> DbStorage<Database, C>
1761where
1762 Database: KeyValueDatabase + Clone + 'static,
1763 Database::Error: Send + Sync,
1764 Database::Store: KeyValueStore + Clone + 'static,
1765 C: Clock + Clone + Send + Sync + 'static,
1766{
1767 pub(crate) fn new(database: Database, wasm_runtime: Option<WasmRuntime>, clock: C) -> Self {
1768 Self {
1769 database: Arc::new(database),
1770 clock,
1771 #[cfg_attr(web, expect(clippy::arc_with_non_send_sync))]
1773 thread_pool: Arc::new(linera_execution::ThreadPool::new(20)),
1774 wasm_runtime,
1775 user_contracts: Arc::new(papaya::HashMap::new()),
1776 user_services: Arc::new(papaya::HashMap::new()),
1777 execution_runtime_config: ExecutionRuntimeConfig::default(),
1778 }
1779 }
1780
1781 pub fn with_allow_application_logs(mut self, allow: bool) -> Self {
1783 self.execution_runtime_config.allow_application_logs = allow;
1784 self
1785 }
1786}
1787
1788impl<Database> DbStorage<Database, WallClock>
1789where
1790 Database: KeyValueDatabase + Clone + 'static,
1791 Database::Error: Send + Sync,
1792 Database::Store: KeyValueStore + Clone + 'static,
1793{
1794 pub async fn maybe_create_and_connect(
1795 config: &Database::Config,
1796 namespace: &str,
1797 wasm_runtime: Option<WasmRuntime>,
1798 ) -> Result<Self, ViewError> {
1799 let database = Database::maybe_create_and_connect(config, namespace).await?;
1800 let storage = Self::new(database, wasm_runtime, WallClock);
1801 Ok(storage)
1802 }
1803
1804 pub async fn connect(
1805 config: &Database::Config,
1806 namespace: &str,
1807 wasm_runtime: Option<WasmRuntime>,
1808 ) -> Result<Self, ViewError> {
1809 let database = Database::connect(config, namespace).await?;
1810 let storage = Self::new(database, wasm_runtime, WallClock);
1811 Ok(storage)
1812 }
1813
1814 pub async fn list_blob_ids(
1816 config: &Database::Config,
1817 namespace: &str,
1818 ) -> Result<Vec<BlobId>, ViewError> {
1819 let database = Database::connect(config, namespace).await?;
1820 let root_keys = database.list_root_keys().await?;
1821 let mut blob_ids = Vec::new();
1822 for root_key in root_keys {
1823 if !root_key.is_empty() && root_key[0] == BLOB_ID_TAG {
1824 let root_key_red = &root_key[1..];
1825 let blob_id = bcs::from_bytes(root_key_red)?;
1826 blob_ids.push(blob_id);
1827 }
1828 }
1829 Ok(blob_ids)
1830 }
1831}
1832
1833impl<Database> DbStorage<Database, WallClock>
1834where
1835 Database: KeyValueDatabase + Clone + Send + Sync + 'static,
1836 Database::Error: Send + Sync,
1837{
1838 pub async fn list_chain_ids(
1840 config: &Database::Config,
1841 namespace: &str,
1842 ) -> Result<Vec<ChainId>, ViewError> {
1843 let database = Database::connect(config, namespace).await?;
1844 let root_keys = database.list_root_keys().await?;
1845 let mut chain_ids = Vec::new();
1846 for root_key in root_keys {
1847 if !root_key.is_empty() && root_key[0] == CHAIN_ID_TAG {
1848 let root_key_red = &root_key[1..];
1849 let chain_id = bcs::from_bytes(root_key_red)?;
1850 chain_ids.push(chain_id);
1851 }
1852 }
1853 Ok(chain_ids)
1854 }
1855}
1856
1857#[cfg(with_testing)]
1858impl<Database> DbStorage<Database, TestClock>
1859where
1860 Database: TestKeyValueDatabase + Clone + Send + Sync + 'static,
1861 Database::Store: KeyValueStore + Clone + Send + Sync + 'static,
1862 Database::Error: Send + Sync,
1863{
1864 pub async fn make_test_storage(wasm_runtime: Option<WasmRuntime>) -> Self {
1865 let config = Database::new_test_config().await.unwrap();
1866 let namespace = generate_test_namespace();
1867 DbStorage::<Database, TestClock>::new_for_testing(
1868 config,
1869 &namespace,
1870 wasm_runtime,
1871 TestClock::new(),
1872 )
1873 .await
1874 .unwrap()
1875 }
1876
1877 pub async fn new_for_testing(
1878 config: Database::Config,
1879 namespace: &str,
1880 wasm_runtime: Option<WasmRuntime>,
1881 clock: TestClock,
1882 ) -> Result<Self, ViewError> {
1883 let database = Database::recreate_and_connect(&config, namespace).await?;
1884 let storage = Self::new(database, wasm_runtime, clock);
1885 storage.assert_is_migrated_storage().await?;
1886 Ok(storage)
1887 }
1888}