Skip to main content

linera_storage/
lib.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module defines the storage abstractions for individual chains and certificates.
5
6mod db_storage;
7mod migration;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use itertools::Itertools;
13use linera_base::{
14    crypto::CryptoHash,
15    data_types::{
16        ApplicationDescription, Blob, BlockHeight, ChainDescription, CompressedBytecode,
17        NetworkDescription, TimeDelta, Timestamp,
18    },
19    identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
20    vm::VmRuntime,
21};
22use linera_chain::{
23    types::{ConfirmedBlock, ConfirmedBlockCertificate},
24    ChainError, ChainStateView,
25};
26#[cfg(with_revm)]
27use linera_execution::{
28    evm::revm::{EvmContractModule, EvmServiceModule},
29    EvmRuntime,
30};
31use linera_execution::{
32    BlobState, ExecutionError, ExecutionRuntimeConfig, ExecutionRuntimeContext, TransactionTracker,
33    UserContractCode, UserServiceCode, WasmRuntime,
34};
35#[cfg(with_wasm_runtime)]
36use linera_execution::{WasmContractModule, WasmServiceModule};
37use linera_views::{context::Context, views::RootView, ViewError};
38
39#[cfg(with_metrics)]
40pub use crate::db_storage::metrics;
41#[cfg(with_testing)]
42pub use crate::db_storage::TestClock;
43pub use crate::db_storage::{ChainStatesFirstAssignment, DbStorage, WallClock};
44
45/// The default namespace to be used when none is specified
46pub const DEFAULT_NAMESPACE: &str = "table_linera";
47
48/// Communicate with a persistent storage using the "views" abstraction.
49#[cfg_attr(not(web), async_trait)]
50#[cfg_attr(web, async_trait(?Send))]
51pub trait Storage: linera_base::util::traits::AutoTraits + Sized {
52    /// The low-level storage implementation in use by the core protocol (chain workers etc).
53    type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + 'static;
54
55    /// The clock type being used.
56    type Clock: Clock + Clone + Send + Sync;
57
58    /// The low-level storage implementation in use by the block exporter.
59    type BlockExporterContext: Context<Extra = u32> + Clone;
60
61    /// Returns the current wall clock time.
62    fn clock(&self) -> &Self::Clock;
63
64    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool>;
65
66    /// Loads the view of a chain state.
67    ///
68    /// # Notes
69    ///
70    /// Each time this method is called, a new [`ChainStateView`] is created. If there are multiple
71    /// instances of the same chain active at any given moment, they will race to access persistent
72    /// storage. This can lead to invalid states and data corruption.
73    async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
74
75    /// Tests the existence of a blob with the given blob ID.
76    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
77
78    /// Returns what blobs from the input are missing from storage.
79    async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<BlobId>, ViewError>;
80
81    /// Tests existence of a blob state with the given blob ID.
82    async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
83
84    /// Reads the hashed certificate value with the given hash.
85    async fn read_confirmed_block(
86        &self,
87        hash: CryptoHash,
88    ) -> Result<Option<ConfirmedBlock>, ViewError>;
89
90    /// Reads a number of confirmed blocks by their hashes.
91    async fn read_confirmed_blocks<I: IntoIterator<Item = CryptoHash> + Send>(
92        &self,
93        hashes: I,
94    ) -> Result<Vec<Option<ConfirmedBlock>>, ViewError>;
95
96    /// Reads the blob with the given blob ID.
97    async fn read_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError>;
98
99    /// Reads the blobs with the given blob IDs.
100    async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
101
102    /// Reads the blob state with the given blob ID.
103    async fn read_blob_state(&self, blob_id: BlobId) -> Result<Option<BlobState>, ViewError>;
104
105    /// Reads the blob states with the given blob IDs.
106    async fn read_blob_states(
107        &self,
108        blob_ids: &[BlobId],
109    ) -> Result<Vec<Option<BlobState>>, ViewError>;
110
111    /// Writes the given blob.
112    async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
113
114    /// Writes blobs and certificate
115    async fn write_blobs_and_certificate(
116        &self,
117        blobs: &[Blob],
118        certificate: &ConfirmedBlockCertificate,
119    ) -> Result<(), ViewError>;
120
121    /// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
122    /// blobs that were written.
123    async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
124
125    /// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
126    async fn maybe_write_blob_states(
127        &self,
128        blob_ids: &[BlobId],
129        blob_state: BlobState,
130    ) -> Result<(), ViewError>;
131
132    /// Writes several blobs.
133    async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
134
135    /// Tests existence of the certificate with the given hash.
136    async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
137
138    /// Reads the certificate with the given hash.
139    async fn read_certificate(
140        &self,
141        hash: CryptoHash,
142    ) -> Result<Option<ConfirmedBlockCertificate>, ViewError>;
143
144    /// Reads a number of certificates
145    async fn read_certificates(
146        &self,
147        hashes: &[CryptoHash],
148    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
149
150    /// Reads raw certificate bytes by hashes.
151    ///
152    /// Returns a vector where each element corresponds to the input hash.
153    /// Elements are `None` if no certificate exists for that hash.
154    /// Each found certificate is returned as `Some((lite_certificate_bytes, confirmed_block_bytes))`.
155    async fn read_certificates_raw(
156        &self,
157        hashes: &[CryptoHash],
158    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
159
160    /// Reads certificates by heights for a given chain.
161    /// Returns a vector where each element corresponds to the input height.
162    /// Elements are `None` if no certificate exists at that height.
163    async fn read_certificates_by_heights(
164        &self,
165        chain_id: ChainId,
166        heights: &[BlockHeight],
167    ) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
168
169    /// Reads raw certificates by heights for a given chain.
170    /// Returns a vector where each element corresponds to the input height.
171    /// Elements are `None` if no certificate exists at that height.
172    /// Each found certificate is returned as a tuple of (lite_certificate_bytes, confirmed_block_bytes).
173    async fn read_certificates_by_heights_raw(
174        &self,
175        chain_id: ChainId,
176        heights: &[BlockHeight],
177    ) -> Result<Vec<Option<(Vec<u8>, Vec<u8>)>>, ViewError>;
178
179    /// Returns a vector of certificate hashes for the requested chain and heights.
180    /// The resulting vector maintains the order of the input `heights` argument.
181    /// Elements are `None` if no certificate exists at that height.
182    async fn read_certificate_hashes_by_heights(
183        &self,
184        chain_id: ChainId,
185        heights: &[BlockHeight],
186    ) -> Result<Vec<Option<CryptoHash>>, ViewError>;
187
188    /// Writes certificate height index entries for a given chain.
189    /// This is used to populate the height->hash index when certificates are found
190    /// via alternative methods (e.g., from chain state).
191    async fn write_certificate_height_indices(
192        &self,
193        chain_id: ChainId,
194        indices: &[(BlockHeight, CryptoHash)],
195    ) -> Result<(), ViewError>;
196
197    /// Reads the event with the given ID.
198    async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
199
200    /// Tests existence of the event with the given ID.
201    async fn contains_event(&self, id: EventId) -> Result<bool, ViewError>;
202
203    /// Lists all the events from a starting index
204    async fn read_events_from_index(
205        &self,
206        chain_id: &ChainId,
207        stream_id: &StreamId,
208        start_index: u32,
209    ) -> Result<Vec<IndexAndEvent>, ViewError>;
210
211    /// Writes a vector of events.
212    async fn write_events(
213        &self,
214        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
215    ) -> Result<(), ViewError>;
216
217    /// Reads the network description.
218    async fn read_network_description(&self) -> Result<Option<NetworkDescription>, ViewError>;
219
220    /// Writes the network description.
221    async fn write_network_description(
222        &self,
223        information: &NetworkDescription,
224    ) -> Result<(), ViewError>;
225
226    /// Initializes a chain in a simple way (used for testing and to create a genesis state).
227    ///
228    /// # Notes
229    ///
230    /// This method creates a new [`ChainStateView`] instance. If there are multiple instances of
231    /// the same chain active at any given moment, they will race to access persistent storage.
232    /// This can lead to invalid states and data corruption.
233    async fn create_chain(&self, description: ChainDescription) -> Result<(), ChainError>
234    where
235        ChainRuntimeContext<Self>: ExecutionRuntimeContext,
236    {
237        let id = description.id();
238        // Store the description blob.
239        self.write_blob(&Blob::new_chain_description(&description))
240            .await?;
241        let mut chain = self.load_chain(id).await?;
242        assert!(!chain.is_active(), "Attempting to create a chain twice");
243        let current_time = self.clock().current_time();
244        chain.initialize_if_needed(current_time).await?;
245        chain.save().await?;
246        Ok(())
247    }
248
249    /// Selects the WebAssembly runtime to use for applications (if any).
250    fn wasm_runtime(&self) -> Option<WasmRuntime>;
251
252    /// Creates a [`UserContractCode`] instance using the bytecode in storage referenced
253    /// by the `application_description`.
254    async fn load_contract(
255        &self,
256        application_description: &ApplicationDescription,
257        txn_tracker: &TransactionTracker,
258    ) -> Result<UserContractCode, ExecutionError> {
259        let contract_bytecode_blob_id = application_description.contract_bytecode_blob_id();
260        let content = match txn_tracker.get_blob_content(&contract_bytecode_blob_id) {
261            Some(content) => content.clone(),
262            None => self
263                .read_blob(contract_bytecode_blob_id)
264                .await?
265                .ok_or(ExecutionError::BlobsNotFound(vec![
266                    contract_bytecode_blob_id,
267                ]))?
268                .into_content(),
269        };
270        let compressed_contract_bytecode = CompressedBytecode {
271            compressed_bytes: content.into_arc_bytes(),
272        };
273        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
274        let contract_bytecode = self
275            .thread_pool()
276            .run_send((), move |()| async move {
277                compressed_contract_bytecode.decompress()
278            })
279            .await
280            .await??;
281        match application_description.module_id.vm_runtime {
282            VmRuntime::Wasm => {
283                cfg_if::cfg_if! {
284                    if #[cfg(with_wasm_runtime)] {
285                        let Some(wasm_runtime) = self.wasm_runtime() else {
286                            panic!("A Wasm runtime is required to load user applications.");
287                        };
288                        Ok(WasmContractModule::new(contract_bytecode, wasm_runtime)
289                           .await?
290                           .into())
291                    } else {
292                        panic!(
293                            "A Wasm runtime is required to load user applications. \
294                             Please enable the `wasmer` or the `wasmtime` feature flags \
295                             when compiling `linera-storage`."
296                        );
297                    }
298                }
299            }
300            VmRuntime::Evm => {
301                cfg_if::cfg_if! {
302                    if #[cfg(with_revm)] {
303                        let evm_runtime = EvmRuntime::Revm;
304                        Ok(EvmContractModule::new(contract_bytecode, evm_runtime)?
305                           .into())
306                    } else {
307                        panic!(
308                            "An Evm runtime is required to load user applications. \
309                             Please enable the `revm` feature flag \
310                             when compiling `linera-storage`."
311                        );
312                    }
313                }
314            }
315        }
316    }
317
318    /// Creates a [`linera-sdk::UserContract`] instance using the bytecode in storage referenced
319    /// by the `application_description`.
320    async fn load_service(
321        &self,
322        application_description: &ApplicationDescription,
323        txn_tracker: &TransactionTracker,
324    ) -> Result<UserServiceCode, ExecutionError> {
325        let service_bytecode_blob_id = application_description.service_bytecode_blob_id();
326        let content = match txn_tracker.get_blob_content(&service_bytecode_blob_id) {
327            Some(content) => content.clone(),
328            None => self
329                .read_blob(service_bytecode_blob_id)
330                .await?
331                .ok_or(ExecutionError::BlobsNotFound(vec![
332                    service_bytecode_blob_id,
333                ]))?
334                .into_content(),
335        };
336        let compressed_service_bytecode = CompressedBytecode {
337            compressed_bytes: content.into_arc_bytes(),
338        };
339        #[cfg_attr(not(any(with_wasm_runtime, with_revm)), allow(unused_variables))]
340        let service_bytecode = self
341            .thread_pool()
342            .run_send((), move |()| async move {
343                compressed_service_bytecode.decompress()
344            })
345            .await
346            .await??;
347        match application_description.module_id.vm_runtime {
348            VmRuntime::Wasm => {
349                cfg_if::cfg_if! {
350                    if #[cfg(with_wasm_runtime)] {
351                        let Some(wasm_runtime) = self.wasm_runtime() else {
352                            panic!("A Wasm runtime is required to load user applications.");
353                        };
354                        Ok(WasmServiceModule::new(service_bytecode, wasm_runtime)
355                           .await?
356                           .into())
357                    } else {
358                        panic!(
359                            "A Wasm runtime is required to load user applications. \
360                             Please enable the `wasmer` or the `wasmtime` feature flags \
361                             when compiling `linera-storage`."
362                        );
363                    }
364                }
365            }
366            VmRuntime::Evm => {
367                cfg_if::cfg_if! {
368                    if #[cfg(with_revm)] {
369                        let evm_runtime = EvmRuntime::Revm;
370                        Ok(EvmServiceModule::new(service_bytecode, evm_runtime)?
371                           .into())
372                    } else {
373                        panic!(
374                            "An Evm runtime is required to load user applications. \
375                             Please enable the `revm` feature flag \
376                             when compiling `linera-storage`."
377                        );
378                    }
379                }
380            }
381        }
382    }
383
384    async fn block_exporter_context(
385        &self,
386        block_exporter_id: u32,
387    ) -> Result<Self::BlockExporterContext, ViewError>;
388}
389
390/// The result of processing the obtained read certificates.
391pub enum ResultReadCertificates {
392    Certificates(Vec<ConfirmedBlockCertificate>),
393    InvalidHashes(Vec<CryptoHash>),
394}
395
396impl ResultReadCertificates {
397    /// Creating the processed read certificates.
398    pub fn new(
399        certificates: Vec<Option<ConfirmedBlockCertificate>>,
400        hashes: Vec<CryptoHash>,
401    ) -> Self {
402        let (certificates, invalid_hashes) = certificates
403            .into_iter()
404            .zip(hashes)
405            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(certificate, hash)| match certificate {
406                Some(cert) => itertools::Either::Left(cert),
407                None => itertools::Either::Right(hash),
408            });
409        if invalid_hashes.is_empty() {
410            Self::Certificates(certificates)
411        } else {
412            Self::InvalidHashes(invalid_hashes)
413        }
414    }
415}
416
417/// The result of processing the obtained read confirmed blocks.
418pub enum ResultReadConfirmedBlocks {
419    Blocks(Vec<ConfirmedBlock>),
420    InvalidHashes(Vec<CryptoHash>),
421}
422
423impl ResultReadConfirmedBlocks {
424    /// Creating the processed read confirmed blocks.
425    pub fn new(blocks: Vec<Option<ConfirmedBlock>>, hashes: Vec<CryptoHash>) -> Self {
426        let (blocks, invalid_hashes) = blocks
427            .into_iter()
428            .zip(hashes)
429            .partition_map::<Vec<_>, Vec<_>, _, _, _>(|(block, hash)| match block {
430                Some(block) => itertools::Either::Left(block),
431                None => itertools::Either::Right(hash),
432            });
433        if invalid_hashes.is_empty() {
434            Self::Blocks(blocks)
435        } else {
436            Self::InvalidHashes(invalid_hashes)
437        }
438    }
439}
440
441/// An implementation of `ExecutionRuntimeContext` suitable for the core protocol.
442#[derive(Clone)]
443pub struct ChainRuntimeContext<S> {
444    storage: S,
445    chain_id: ChainId,
446    thread_pool: Arc<linera_execution::ThreadPool>,
447    execution_runtime_config: ExecutionRuntimeConfig,
448    user_contracts: Arc<papaya::HashMap<ApplicationId, UserContractCode>>,
449    user_services: Arc<papaya::HashMap<ApplicationId, UserServiceCode>>,
450}
451
452#[cfg_attr(not(web), async_trait)]
453#[cfg_attr(web, async_trait(?Send))]
454impl<S: Storage> ExecutionRuntimeContext for ChainRuntimeContext<S> {
455    fn chain_id(&self) -> ChainId {
456        self.chain_id
457    }
458
459    fn thread_pool(&self) -> &Arc<linera_execution::ThreadPool> {
460        &self.thread_pool
461    }
462
463    fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
464        self.execution_runtime_config
465    }
466
467    fn user_contracts(&self) -> &Arc<papaya::HashMap<ApplicationId, UserContractCode>> {
468        &self.user_contracts
469    }
470
471    fn user_services(&self) -> &Arc<papaya::HashMap<ApplicationId, UserServiceCode>> {
472        &self.user_services
473    }
474
475    async fn get_user_contract(
476        &self,
477        description: &ApplicationDescription,
478        txn_tracker: &TransactionTracker,
479    ) -> Result<UserContractCode, ExecutionError> {
480        let application_id = description.into();
481        let pinned = self.user_contracts.pin_owned();
482        if let Some(contract) = pinned.get(&application_id) {
483            return Ok(contract.clone());
484        }
485        let contract = self.storage.load_contract(description, txn_tracker).await?;
486        pinned.insert(application_id, contract.clone());
487        Ok(contract)
488    }
489
490    async fn get_user_service(
491        &self,
492        description: &ApplicationDescription,
493        txn_tracker: &TransactionTracker,
494    ) -> Result<UserServiceCode, ExecutionError> {
495        let application_id = description.into();
496        let pinned = self.user_services.pin_owned();
497        if let Some(service) = pinned.get(&application_id) {
498            return Ok(service.clone());
499        }
500        let service = self.storage.load_service(description, txn_tracker).await?;
501        pinned.insert(application_id, service.clone());
502        Ok(service)
503    }
504
505    async fn get_blob(&self, blob_id: BlobId) -> Result<Option<Blob>, ViewError> {
506        self.storage.read_blob(blob_id).await
507    }
508
509    async fn get_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
510        self.storage.read_event(event_id).await
511    }
512
513    async fn get_network_description(&self) -> Result<Option<NetworkDescription>, ViewError> {
514        self.storage.read_network_description().await
515    }
516
517    async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
518        self.storage.contains_blob(blob_id).await
519    }
520
521    async fn contains_event(&self, event_id: EventId) -> Result<bool, ViewError> {
522        self.storage.contains_event(event_id).await
523    }
524
525    #[cfg(with_testing)]
526    async fn add_blobs(
527        &self,
528        blobs: impl IntoIterator<Item = Blob> + Send,
529    ) -> Result<(), ViewError> {
530        let blobs = Vec::from_iter(blobs);
531        self.storage.write_blobs(&blobs).await
532    }
533
534    #[cfg(with_testing)]
535    async fn add_events(
536        &self,
537        events: impl IntoIterator<Item = (EventId, Vec<u8>)> + Send,
538    ) -> Result<(), ViewError> {
539        self.storage.write_events(events).await
540    }
541}
542
543/// A clock that can be used to get the current `Timestamp`.
544#[cfg_attr(not(web), async_trait)]
545#[cfg_attr(web, async_trait(?Send))]
546pub trait Clock {
547    fn current_time(&self) -> Timestamp;
548
549    async fn sleep(&self, delta: TimeDelta);
550
551    async fn sleep_until(&self, timestamp: Timestamp);
552}