#![deny(clippy::large_futures)]
mod db_storage;
use std::sync::Arc;
use async_trait::async_trait;
use dashmap::{mapref::entry::Entry, DashMap};
use futures::future;
use linera_base::{
crypto::{CryptoHash, PublicKey},
data_types::{Amount, Blob, BlockHeight, TimeDelta, Timestamp, UserApplicationDescription},
identifiers::{BlobId, ChainDescription, ChainId, GenericApplicationId, UserApplicationId},
ownership::ChainOwnership,
};
use linera_chain::{
data_types::{Certificate, ChannelFullName, HashedCertificateValue},
ChainError, ChainStateView,
};
use linera_execution::{
committee::{Committee, Epoch},
system::SystemChannel,
BlobState, ChannelSubscription, ExecutionError, ExecutionRuntimeConfig,
ExecutionRuntimeContext, UserContractCode, UserServiceCode, WasmRuntime,
};
#[cfg(with_wasm_runtime)]
use linera_execution::{WasmContractModule, WasmServiceModule};
use linera_views::{
context::Context,
views::{CryptoHashView, RootView, ViewError},
};
#[cfg(with_testing)]
pub use crate::db_storage::TestClock;
pub use crate::db_storage::{DbStorage, WallClock};
#[cfg(with_metrics)]
pub use crate::db_storage::{
READ_CERTIFICATE_COUNTER, READ_HASHED_CERTIFICATE_VALUE_COUNTER, WRITE_CERTIFICATE_COUNTER,
WRITE_HASHED_CERTIFICATE_VALUE_COUNTER,
};
#[async_trait]
pub trait Storage: Sized {
type Context: Context<Extra = ChainRuntimeContext<Self>> + Clone + Send + Sync + 'static;
type Clock: Clock;
fn clock(&self) -> &Self::Clock;
async fn load_chain(&self, id: ChainId) -> Result<ChainStateView<Self::Context>, ViewError>;
async fn contains_hashed_certificate_value(&self, hash: CryptoHash) -> Result<bool, ViewError>;
async fn contains_hashed_certificate_values(
&self,
hash: Vec<CryptoHash>,
) -> Result<Vec<bool>, ViewError>;
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError>;
async fn missing_blobs(&self, blob_ids: Vec<BlobId>) -> Result<Vec<BlobId>, ViewError>;
async fn contains_blob_state(&self, blob_id: BlobId) -> Result<bool, ViewError>;
async fn read_hashed_certificate_value(
&self,
hash: CryptoHash,
) -> Result<HashedCertificateValue, ViewError>;
async fn read_blob(&self, blob_id: BlobId) -> Result<Blob, ViewError>;
async fn read_blobs(&self, blob_ids: &[BlobId]) -> Result<Vec<Option<Blob>>, ViewError>;
async fn read_blob_state(&self, blob_id: BlobId) -> Result<BlobState, ViewError>;
async fn read_hashed_certificate_values_downward(
&self,
from: CryptoHash,
limit: u32,
) -> Result<Vec<HashedCertificateValue>, ViewError>;
async fn write_hashed_certificate_value(
&self,
value: &HashedCertificateValue,
) -> Result<(), ViewError>;
async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError>;
async fn write_blobs_and_certificate(
&self,
blobs: &[Blob],
certificate: &Certificate,
) -> Result<(), ViewError>;
async fn write_blob_state(
&self,
blob_id: BlobId,
blob_state: &BlobState,
) -> Result<(), ViewError>;
async fn maybe_write_blob_state(
&self,
blob_id: BlobId,
blob_state: BlobState,
) -> Result<Epoch, ViewError>;
async fn write_hashed_certificate_values(
&self,
values: &[HashedCertificateValue],
) -> Result<(), ViewError>;
async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;
async fn contains_certificate(&self, hash: CryptoHash) -> Result<bool, ViewError>;
async fn read_certificate(&self, hash: CryptoHash) -> Result<Certificate, ViewError>;
async fn write_certificate(&self, certificate: &Certificate) -> Result<(), ViewError>;
async fn write_certificates(&self, certificate: &[Certificate]) -> Result<(), ViewError>;
async fn load_active_chain(
&self,
id: ChainId,
) -> Result<ChainStateView<Self::Context>, linera_chain::ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
{
let chain = self.load_chain(id).await?;
chain.ensure_is_active()?;
Ok(chain)
}
async fn read_certificates<I: IntoIterator<Item = CryptoHash> + Send>(
&self,
keys: I,
) -> Result<Vec<Certificate>, ViewError>
where
Self: Clone + Send + 'static,
{
let mut tasks = Vec::new();
for key in keys {
let client = self.clone();
tasks.push(tokio::task::spawn(async move {
client.read_certificate(key).await
}));
}
let results = future::join_all(tasks).await;
let mut certs = Vec::new();
for result in results {
certs.push(result.expect("storage access should not cancel or crash")?);
}
Ok(certs)
}
async fn create_chain(
&self,
committee: Committee,
admin_id: ChainId,
description: ChainDescription,
public_key: PublicKey,
balance: Amount,
timestamp: Timestamp,
) -> Result<(), ChainError>
where
ChainRuntimeContext<Self>: ExecutionRuntimeContext,
{
let id = description.into();
let mut chain = self.load_chain(id).await?;
assert!(!chain.is_active(), "Attempting to create a chain twice");
chain.manager.get_mut().reset(
&ChainOwnership::single(public_key),
BlockHeight(0),
self.clock().current_time(),
committee.keys_and_weights(),
)?;
let system_state = &mut chain.execution_state.system;
system_state.description.set(Some(description));
system_state.epoch.set(Some(Epoch::ZERO));
system_state.admin_id.set(Some(admin_id));
system_state
.committees
.get_mut()
.insert(Epoch::ZERO, committee);
system_state
.ownership
.set(ChainOwnership::single(public_key));
system_state.balance.set(balance);
system_state.timestamp.set(timestamp);
if id != admin_id {
system_state.subscriptions.insert(&ChannelSubscription {
chain_id: admin_id,
name: SystemChannel::Admin.name(),
})?;
let mut admin_chain = self.load_chain(admin_id).await?;
let full_name = ChannelFullName {
application_id: GenericApplicationId::System,
name: SystemChannel::Admin.name(),
};
{
let mut channel = admin_chain.channels.try_load_entry_mut(&full_name).await?;
channel.subscribers.insert(&id)?;
} admin_chain.save().await?;
}
let state_hash = chain.execution_state.crypto_hash().await?;
chain.execution_state_hash.set(Some(state_hash));
chain.save().await?;
Ok(())
}
fn wasm_runtime(&self) -> Option<WasmRuntime>;
#[cfg(with_wasm_runtime)]
async fn load_contract(
&self,
application_description: &UserApplicationDescription,
) -> Result<UserContractCode, ExecutionError> {
let Some(wasm_runtime) = self.wasm_runtime() else {
panic!("A Wasm runtime is required to load user applications.");
};
let contract_bytecode_blob_id = BlobId::new_contract_bytecode_from_hash(
application_description.bytecode_id.contract_blob_hash,
);
let contract_blob = self.read_blob(contract_bytecode_blob_id).await?;
Ok(Arc::new(
WasmContractModule::new(
contract_blob
.into_inner_contract_bytecode()
.expect("Contract Bytecode Blob is of the wrong Blob type!")
.try_into()?,
wasm_runtime,
)
.await?,
))
}
#[cfg(not(with_wasm_runtime))]
#[allow(clippy::diverging_sub_expression)]
async fn load_contract(
&self,
_application_description: &UserApplicationDescription,
) -> Result<UserContractCode, ExecutionError> {
panic!(
"A Wasm runtime is required to load user applications. \
Please enable the `wasmer` or the `wasmtime` feature flags \
when compiling `linera-storage`."
);
}
#[cfg(with_wasm_runtime)]
async fn load_service(
&self,
application_description: &UserApplicationDescription,
) -> Result<UserServiceCode, ExecutionError> {
let Some(wasm_runtime) = self.wasm_runtime() else {
panic!("A Wasm runtime is required to load user applications.");
};
let service_bytecode_blob_id = BlobId::new_service_bytecode_from_hash(
application_description.bytecode_id.service_blob_hash,
);
let service_blob = self.read_blob(service_bytecode_blob_id).await?;
Ok(Arc::new(
WasmServiceModule::new(
service_blob
.into_inner_service_bytecode()
.expect("Service Bytecode Blob is of the wrong Blob type!")
.try_into()?,
wasm_runtime,
)
.await?,
))
}
#[cfg(not(with_wasm_runtime))]
#[allow(clippy::diverging_sub_expression)]
async fn load_service(
&self,
_application_description: &UserApplicationDescription,
) -> Result<UserServiceCode, ExecutionError> {
panic!(
"A Wasm runtime is required to load user applications. \
Please enable the `wasmer` or the `wasmtime` feature flags \
when compiling `linera-storage`."
);
}
}
#[derive(Clone)]
pub struct ChainRuntimeContext<S> {
storage: S,
chain_id: ChainId,
execution_runtime_config: ExecutionRuntimeConfig,
user_contracts: Arc<DashMap<UserApplicationId, UserContractCode>>,
user_services: Arc<DashMap<UserApplicationId, UserServiceCode>>,
}
#[async_trait]
impl<S> ExecutionRuntimeContext for ChainRuntimeContext<S>
where
S: Storage + Send + Sync,
{
fn chain_id(&self) -> ChainId {
self.chain_id
}
fn execution_runtime_config(&self) -> linera_execution::ExecutionRuntimeConfig {
self.execution_runtime_config
}
fn user_contracts(&self) -> &Arc<DashMap<UserApplicationId, UserContractCode>> {
&self.user_contracts
}
fn user_services(&self) -> &Arc<DashMap<UserApplicationId, UserServiceCode>> {
&self.user_services
}
async fn get_user_contract(
&self,
description: &UserApplicationDescription,
) -> Result<UserContractCode, ExecutionError> {
match self.user_contracts.entry(description.into()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
let contract = self.storage.load_contract(description).await?;
entry.insert(contract.clone());
Ok(contract)
}
}
}
async fn get_user_service(
&self,
description: &UserApplicationDescription,
) -> Result<UserServiceCode, ExecutionError> {
match self.user_services.entry(description.into()) {
Entry::Occupied(entry) => Ok(entry.get().clone()),
Entry::Vacant(entry) => {
let service = self.storage.load_service(description).await?;
entry.insert(service.clone());
Ok(service)
}
}
}
async fn get_blob(&self, blob_id: BlobId) -> Result<Blob, ExecutionError> {
Ok(self.storage.read_blob(blob_id).await?)
}
async fn contains_blob(&self, blob_id: BlobId) -> Result<bool, ViewError> {
self.storage.contains_blob(blob_id).await
}
}
#[async_trait]
pub trait Clock {
fn current_time(&self) -> Timestamp;
async fn sleep(&self, delta: TimeDelta);
async fn sleep_until(&self, timestamp: Timestamp);
}